Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-19 Thread Guozhang Wang
I'm fine with always removing threads at DEAD state from
localThreadsMetadata().

On the other hand, if we want to give users more info for debugging, we can
consider eating the complexity ourselves by guaranteeing that, within the
user registered uncaught exception handler, the localThreadsMetadata()
would include the thread who's dying / throwing at that moment.
Implementation wise we can always register our own internal handler which
calles the user registered one if it is set, and only after that we remove
the thread metadata from the instance cache. Personally I think it is a bit
too much, so I'm in favor of "adding it later when people complain" :)


Guozhang


On Fri, Sep 18, 2020 at 5:17 PM Sophie Blee-Goldman 
wrote:

> Makes sense to me :)
>
> On Thu, Sep 17, 2020 at 9:34 AM Bruno Cadonna  wrote:
>
> > Hi Sophie,
> >
> > Thank you for the feedback! I replied inline.
> >
> > Best,
> > Bruno
> >
> > On 16.09.20 19:19, Sophie Blee-Goldman wrote:
> > >>
> > >> We guarantee that the metadata of the dead stream threads  will be
> > >> returned by KafkaStreams#localThreadsMetadata() at least until the
> next
> > >> call to KafkaStreams#addStreamThread() or
> > >> KafkaStreams#removeStreamThread() after the stream thread transited to
> > >> DEAD
> > >
> > >
> > > This seems kind of tricky...personally I would find it pretty odd if I
> > > queried the
> > > local thread metadata and found two threads, A (alive) and B (dead),
> and
> > > then
> > > called removeStreamThread() and now suddenly I have zero. Or if I call
> > > addStreamThread and now I still have two threads.
> > >
> >
> > The behavior might be unusual, but it is well defined and not random by
> > any means.
> >
> > > Both of those results seem to indicate that only live threads "count"
> and
> > > are returned
> > > by localThreadsMetadata(). But in reality we do temporarily keep the
> dead
> > > thread,
> > > but only for the arbitrary amount of time until the next time you want
> to
> > > add or
> > > remove some other stream thread? That seems like a weird side effect of
> > the
> > > add/removeStreamThread APIs.
> > >
> >
> > This is not a side effect that just happens to occur. This is a
> > guarantee that users get. It gives users the possibility to retrieve the
> > metadata of the dead stream threads since the last call to
> > add/removeStreamThread. Admittedly, this guarantee overlap with the
> > current/planned implementation. But that is more a coincidence.
> >
> > I would be more concerned about when add/removeStreamThread is called
> > from different threads which could happen if an uncaught exception
> > handler is called that wants to replace a stream thread and a thread
> > that is responsible for automated scaling up is running.
> >
> > > If we really think users might want to log the metadata of dead
> threads,
> > > then
> > > let's just do that for them or give them a way to do exactly that.
> > >
> >
> > Logging the metatdata of dead stream threads for the user is a valid
> > alternative. Giving users the way to do exactly that is hard because the
> > StreamThread class is not part of the public API. They would always need
> > to call a method on the KafkaStreams object where we already have
> > localThreadsMetadata().
> >
> > > I'm not that concerned about the backwards compatibility of removing
> dead
> > > threads from the localThreadsMetadata, because I find it hard to
> believe
> > > that
> > > users do anything other than just skip over them in the list (set?)
> that
> > > gets
> > > returned. But maybe someone can chime in with an example use case.
> > >
> >
> > I am also not too much concerned about backwards compatibility. That
> > would indeed be a side effect of the current proposal.
> >
> > > I'm actually even a little skeptical that any users might want to log
> the
> > > metadata of a
> > > dead thread, since all of the metadata is only useful for IQ on live
> > > threads or
> > > already covered by other easily discoverable logging elsewhere, or
> both.
> > >
> >
> > Said all of the above, I actually agree with you that there is not that
> > much information in the metadata of a dead stream thread that is
> > interesting. The name of the stream thread is known in the uncaught
> > exception handler. The names of the clients, like consumer etc., used by
> > the stream thread can be derived from the name of the stream thread.
> > Finally, the sets of active and standby tasks should be empty for a dead
> > stream thread.
> >
> > Hence, I backpedal and propose to filter out dead stream threads from
> > localThreadsMetadata(). WDYT?
> >
> > > On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi again,
> > >>
> > >> I just realized that if we filter out DEAD stream threads in
> > >> localThreadsMetadata(), users cannot log the metadata of dying stream
> > >> threads in the uncaught exception handler.
> > >>
> > >> I realized this thanks to the example Guozhang requested in the KIP.
> > >> 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-18 Thread Sophie Blee-Goldman
Makes sense to me :)

On Thu, Sep 17, 2020 at 9:34 AM Bruno Cadonna  wrote:

> Hi Sophie,
>
> Thank you for the feedback! I replied inline.
>
> Best,
> Bruno
>
> On 16.09.20 19:19, Sophie Blee-Goldman wrote:
> >>
> >> We guarantee that the metadata of the dead stream threads  will be
> >> returned by KafkaStreams#localThreadsMetadata() at least until the next
> >> call to KafkaStreams#addStreamThread() or
> >> KafkaStreams#removeStreamThread() after the stream thread transited to
> >> DEAD
> >
> >
> > This seems kind of tricky...personally I would find it pretty odd if I
> > queried the
> > local thread metadata and found two threads, A (alive) and B (dead), and
> > then
> > called removeStreamThread() and now suddenly I have zero. Or if I call
> > addStreamThread and now I still have two threads.
> >
>
> The behavior might be unusual, but it is well defined and not random by
> any means.
>
> > Both of those results seem to indicate that only live threads "count" and
> > are returned
> > by localThreadsMetadata(). But in reality we do temporarily keep the dead
> > thread,
> > but only for the arbitrary amount of time until the next time you want to
> > add or
> > remove some other stream thread? That seems like a weird side effect of
> the
> > add/removeStreamThread APIs.
> >
>
> This is not a side effect that just happens to occur. This is a
> guarantee that users get. It gives users the possibility to retrieve the
> metadata of the dead stream threads since the last call to
> add/removeStreamThread. Admittedly, this guarantee overlap with the
> current/planned implementation. But that is more a coincidence.
>
> I would be more concerned about when add/removeStreamThread is called
> from different threads which could happen if an uncaught exception
> handler is called that wants to replace a stream thread and a thread
> that is responsible for automated scaling up is running.
>
> > If we really think users might want to log the metadata of dead threads,
> > then
> > let's just do that for them or give them a way to do exactly that.
> >
>
> Logging the metatdata of dead stream threads for the user is a valid
> alternative. Giving users the way to do exactly that is hard because the
> StreamThread class is not part of the public API. They would always need
> to call a method on the KafkaStreams object where we already have
> localThreadsMetadata().
>
> > I'm not that concerned about the backwards compatibility of removing dead
> > threads from the localThreadsMetadata, because I find it hard to believe
> > that
> > users do anything other than just skip over them in the list (set?) that
> > gets
> > returned. But maybe someone can chime in with an example use case.
> >
>
> I am also not too much concerned about backwards compatibility. That
> would indeed be a side effect of the current proposal.
>
> > I'm actually even a little skeptical that any users might want to log the
> > metadata of a
> > dead thread, since all of the metadata is only useful for IQ on live
> > threads or
> > already covered by other easily discoverable logging elsewhere, or both.
> >
>
> Said all of the above, I actually agree with you that there is not that
> much information in the metadata of a dead stream thread that is
> interesting. The name of the stream thread is known in the uncaught
> exception handler. The names of the clients, like consumer etc., used by
> the stream thread can be derived from the name of the stream thread.
> Finally, the sets of active and standby tasks should be empty for a dead
> stream thread.
>
> Hence, I backpedal and propose to filter out dead stream threads from
> localThreadsMetadata(). WDYT?
>
> > On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna 
> wrote:
> >
> >> Hi again,
> >>
> >> I just realized that if we filter out DEAD stream threads in
> >> localThreadsMetadata(), users cannot log the metadata of dying stream
> >> threads in the uncaught exception handler.
> >>
> >> I realized this thanks to the example Guozhang requested in the KIP.
> >> Thank you for that, Guozhang!
> >>
> >> Hence, I adapted the KIP as follows:
> >>
> >> - We do not filter out DEAD stream threads in
> >> KafkaStreams#localThreadsMetadata()
> >>
> >> - We guarantee that the metadata of the dead stream threads  will be
> >> returned by KafkaStreams#localThreadsMetadata() at least until the next
> >> call to KafkaStreams#addStreamThread() or
> >> KafkaStreams#removeStreamThread() after the stream thread transited to
> >> DEAD. Besides giving users the opportunity to log the metadata of a
> >> dying stream thread in its uncaught exception handler, this guarantee
> >> makes KafkaStreams#localThreadsMetadata() completely backward compatible
> >> to the current behavior, because if KafkaStreams#addStreamThread() and
> >> KafkaStreams#removeStreamThread() are never called,
> >> KafkaStreams#localThreadsMetadata() will also return the metadata of all
> >> streams threads that have ever died which corresponds to the current
> >> 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-17 Thread Bruno Cadonna

Hi Sophie,

Thank you for the feedback! I replied inline.

Best,
Bruno

On 16.09.20 19:19, Sophie Blee-Goldman wrote:


We guarantee that the metadata of the dead stream threads  will be
returned by KafkaStreams#localThreadsMetadata() at least until the next
call to KafkaStreams#addStreamThread() or
KafkaStreams#removeStreamThread() after the stream thread transited to
DEAD



This seems kind of tricky...personally I would find it pretty odd if I
queried the
local thread metadata and found two threads, A (alive) and B (dead), and
then
called removeStreamThread() and now suddenly I have zero. Or if I call
addStreamThread and now I still have two threads.



The behavior might be unusual, but it is well defined and not random by 
any means.



Both of those results seem to indicate that only live threads "count" and
are returned
by localThreadsMetadata(). But in reality we do temporarily keep the dead
thread,
but only for the arbitrary amount of time until the next time you want to
add or
remove some other stream thread? That seems like a weird side effect of the
add/removeStreamThread APIs.



This is not a side effect that just happens to occur. This is a 
guarantee that users get. It gives users the possibility to retrieve the 
metadata of the dead stream threads since the last call to 
add/removeStreamThread. Admittedly, this guarantee overlap with the 
current/planned implementation. But that is more a coincidence.


I would be more concerned about when add/removeStreamThread is called 
from different threads which could happen if an uncaught exception 
handler is called that wants to replace a stream thread and a thread 
that is responsible for automated scaling up is running.



If we really think users might want to log the metadata of dead threads,
then
let's just do that for them or give them a way to do exactly that.



Logging the metatdata of dead stream threads for the user is a valid 
alternative. Giving users the way to do exactly that is hard because the 
StreamThread class is not part of the public API. They would always need 
to call a method on the KafkaStreams object where we already have 
localThreadsMetadata().



I'm not that concerned about the backwards compatibility of removing dead
threads from the localThreadsMetadata, because I find it hard to believe
that
users do anything other than just skip over them in the list (set?) that
gets
returned. But maybe someone can chime in with an example use case.



I am also not too much concerned about backwards compatibility. That 
would indeed be a side effect of the current proposal.



I'm actually even a little skeptical that any users might want to log the
metadata of a
dead thread, since all of the metadata is only useful for IQ on live
threads or
already covered by other easily discoverable logging elsewhere, or both.



Said all of the above, I actually agree with you that there is not that 
much information in the metadata of a dead stream thread that is 
interesting. The name of the stream thread is known in the uncaught 
exception handler. The names of the clients, like consumer etc., used by 
the stream thread can be derived from the name of the stream thread. 
Finally, the sets of active and standby tasks should be empty for a dead 
stream thread.


Hence, I backpedal and propose to filter out dead stream threads from 
localThreadsMetadata(). WDYT?



On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna  wrote:


Hi again,

I just realized that if we filter out DEAD stream threads in
localThreadsMetadata(), users cannot log the metadata of dying stream
threads in the uncaught exception handler.

I realized this thanks to the example Guozhang requested in the KIP.
Thank you for that, Guozhang!

Hence, I adapted the KIP as follows:

- We do not filter out DEAD stream threads in
KafkaStreams#localThreadsMetadata()

- We guarantee that the metadata of the dead stream threads  will be
returned by KafkaStreams#localThreadsMetadata() at least until the next
call to KafkaStreams#addStreamThread() or
KafkaStreams#removeStreamThread() after the stream thread transited to
DEAD. Besides giving users the opportunity to log the metadata of a
dying stream thread in its uncaught exception handler, this guarantee
makes KafkaStreams#localThreadsMetadata() completely backward compatible
to the current behavior, because if KafkaStreams#addStreamThread() and
KafkaStreams#removeStreamThread() are never called,
KafkaStreams#localThreadsMetadata() will also return the metadata of all
streams threads that have ever died which corresponds to the current
behavior.

- We guarantee that dead stream threads are removed from a Kafka Streams
client at latest after the next call to KafkaStreams#addStreamThread()
or KafkaStreams#removeStreamThread() following the transition of the
stream thread to DEAD. This guarantees that the number of maintained
stream threads does not grow indefinitely.


Best,
Bruno



On 16.09.20 09:23, Bruno Cadonna wrote:

Hi Guozhang,

Good 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Sophie Blee-Goldman
>
> We guarantee that the metadata of the dead stream threads  will be
> returned by KafkaStreams#localThreadsMetadata() at least until the next
> call to KafkaStreams#addStreamThread() or
> KafkaStreams#removeStreamThread() after the stream thread transited to
> DEAD


This seems kind of tricky...personally I would find it pretty odd if I
queried the
local thread metadata and found two threads, A (alive) and B (dead), and
then
called removeStreamThread() and now suddenly I have zero. Or if I call
addStreamThread and now I still have two threads.

Both of those results seem to indicate that only live threads "count" and
are returned
by localThreadsMetadata(). But in reality we do temporarily keep the dead
thread,
but only for the arbitrary amount of time until the next time you want to
add or
remove some other stream thread? That seems like a weird side effect of the
add/removeStreamThread APIs.

If we really think users might want to log the metadata of dead threads,
then
let's just do that for them or give them a way to do exactly that.

I'm not that concerned about the backwards compatibility of removing dead
threads from the localThreadsMetadata, because I find it hard to believe
that
users do anything other than just skip over them in the list (set?) that
gets
returned. But maybe someone can chime in with an example use case.

I'm actually even a little skeptical that any users might want to log the
metadata of a
dead thread, since all of the metadata is only useful for IQ on live
threads or
already covered by other easily discoverable logging elsewhere, or both.

On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna  wrote:

> Hi again,
>
> I just realized that if we filter out DEAD stream threads in
> localThreadsMetadata(), users cannot log the metadata of dying stream
> threads in the uncaught exception handler.
>
> I realized this thanks to the example Guozhang requested in the KIP.
> Thank you for that, Guozhang!
>
> Hence, I adapted the KIP as follows:
>
> - We do not filter out DEAD stream threads in
> KafkaStreams#localThreadsMetadata()
>
> - We guarantee that the metadata of the dead stream threads  will be
> returned by KafkaStreams#localThreadsMetadata() at least until the next
> call to KafkaStreams#addStreamThread() or
> KafkaStreams#removeStreamThread() after the stream thread transited to
> DEAD. Besides giving users the opportunity to log the metadata of a
> dying stream thread in its uncaught exception handler, this guarantee
> makes KafkaStreams#localThreadsMetadata() completely backward compatible
> to the current behavior, because if KafkaStreams#addStreamThread() and
> KafkaStreams#removeStreamThread() are never called,
> KafkaStreams#localThreadsMetadata() will also return the metadata of all
> streams threads that have ever died which corresponds to the current
> behavior.
>
> - We guarantee that dead stream threads are removed from a Kafka Streams
> client at latest after the next call to KafkaStreams#addStreamThread()
> or KafkaStreams#removeStreamThread() following the transition of the
> stream thread to DEAD. This guarantees that the number of maintained
> stream threads does not grow indefinitely.
>
>
> Best,
> Bruno
>
>
>
> On 16.09.20 09:23, Bruno Cadonna wrote:
> > Hi Guozhang,
> >
> > Good point! I would propose to filter out DEAD stream threads in
> > localThreadsMetadata() to get consistent results that do not depend on
> > timing. I will update the KIP accordingly.
> >
> > Best,
> > Bruno
> >
> > On 16.09.20 06:02, Guozhang Wang wrote:
> >> Thanks Bruno, your replies make sense to me. As for
> >> localThreadsMetadata() itself,
> >> I'd like to clarify if it would return any still-bookkept threads, or
> >> would
> >> it specifically filter out those DEAD threads even if they are not yet
> >> removed.
> >>
> >> Otherwise, the KIP LGTM.
> >>
> >> Guozhang
> >>
> >> On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna 
> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thank you for your feedback. I replied inline.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 09.09.20 23:43, Guozhang Wang wrote:
>  Hello Bruno,
> 
>  Finally got some time to review your KIP and the discussion thread
>  now..
> >>> a
>  few comments below:
> 
>  1) I'm with Matthias about the newly added numberOfAliveStreamThreads
> >>> v.s.
>  existing localThreadsMetadata: to me it seems we can always achieve
> the
>  first based on the second. It seems not worthy to provide some "syntax
>  sugar" to the API but just let users do the filtering themselves.
> >>>
> >>> I am not married to that method. I removed it.
> >>>
>  Furthermore, I'm wondering what's the rationale behind removing the
>  DEAD
>  threads from localThreadsMetadata()? Personally I feel returning all
>  threads, including those who are ever closed, either due to
>  exception or
>  due to removeStreamThread, would be beneficial for debugging
>  purposes, as
>  long as within the 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Bruno Cadonna

Hi again,

I just realized that if we filter out DEAD stream threads in 
localThreadsMetadata(), users cannot log the metadata of dying stream 
threads in the uncaught exception handler.


I realized this thanks to the example Guozhang requested in the KIP. 
Thank you for that, Guozhang!


Hence, I adapted the KIP as follows:

- We do not filter out DEAD stream threads in 
KafkaStreams#localThreadsMetadata()


- We guarantee that the metadata of the dead stream threads  will be 
returned by KafkaStreams#localThreadsMetadata() at least until the next 
call to KafkaStreams#addStreamThread() or 
KafkaStreams#removeStreamThread() after the stream thread transited to 
DEAD. Besides giving users the opportunity to log the metadata of a 
dying stream thread in its uncaught exception handler, this guarantee 
makes KafkaStreams#localThreadsMetadata() completely backward compatible 
to the current behavior, because if KafkaStreams#addStreamThread() and 
KafkaStreams#removeStreamThread() are never called, 
KafkaStreams#localThreadsMetadata() will also return the metadata of all 
streams threads that have ever died which corresponds to the current 
behavior.


- We guarantee that dead stream threads are removed from a Kafka Streams 
client at latest after the next call to KafkaStreams#addStreamThread() 
or KafkaStreams#removeStreamThread() following the transition of the 
stream thread to DEAD. This guarantees that the number of maintained 
stream threads does not grow indefinitely.



Best,
Bruno



On 16.09.20 09:23, Bruno Cadonna wrote:

Hi Guozhang,

Good point! I would propose to filter out DEAD stream threads in 
localThreadsMetadata() to get consistent results that do not depend on 
timing. I will update the KIP accordingly.


Best,
Bruno

On 16.09.20 06:02, Guozhang Wang wrote:

Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or 
would

it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna  wrote:


Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:

Hello Bruno,

Finally got some time to review your KIP and the discussion thread 
now..

a

few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads

v.s.

existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.


I am not married to that method. I removed it.

Furthermore, I'm wondering what's the rationale behind removing the 
DEAD

threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to 
exception or
due to removeStreamThread, would be beneficial for debugging 
purposes, as

long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not 
removed
--- and of course if users do not want to see those DEAD threads 
they can
always filter them out. I'm just proposing that we should still 
leave the

door open for those who want to check those ever terminated threads.



I actually think the number of dead stream threads might increase
linearly. Assume users have a systematic error that continuously kills a
stream thread and they blindly start a new stream thread in the uncaught
exception handler. This scenario might be a mistake but if the
systematic error does not occur at a high rate, it could also be a
strategy to keep the application running during the investigation of the
systematic error.

IMO, removing dead stream threads makes Kafka Streams more robust
because it prevent a possibly unbounded increase of memory usage. If
users want to debug the dead stream threads they can monitor the number
of dead threads with the metric proposed in the KIP and they could
additionally log the metadata of the dying stream thread in the uncaught
exception handler. I do not think that there is need to keep dead stream
threads around.


2) I think it would help to write down some example user code in

exception
handler e.g. to illustrate how this would be implemented -- e.g. we 
know

that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or,

add/terminate

threads dynamically, but I want to see if we have listed all possible

call
paths like: a) a thread's handler logic to terminate another thread, 
b) a

thread handler to add new threads, etc are all appropriately supported
without deadlocks.



I added an example for an uncaught exception handler that adds a stream
thread to the 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Bruno Cadonna

Hi Guozhang,

Good point! I would propose to filter out DEAD stream threads in 
localThreadsMetadata() to get consistent results that do not depend on 
timing. I will update the KIP accordingly.


Best,
Bruno

On 16.09.20 06:02, Guozhang Wang wrote:

Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or would
it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna  wrote:


Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:

Hello Bruno,

Finally got some time to review your KIP and the discussion thread now..

a

few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads

v.s.

existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.


I am not married to that method. I removed it.


Furthermore, I'm wondering what's the rationale behind removing the DEAD
threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to exception or
due to removeStreamThread, would be beneficial for debugging purposes, as
long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not removed
--- and of course if users do not want to see those DEAD threads they can
always filter them out. I'm just proposing that we should still leave the
door open for those who want to check those ever terminated threads.



I actually think the number of dead stream threads might increase
linearly. Assume users have a systematic error that continuously kills a
stream thread and they blindly start a new stream thread in the uncaught
exception handler. This scenario might be a mistake but if the
systematic error does not occur at a high rate, it could also be a
strategy to keep the application running during the investigation of the
systematic error.

IMO, removing dead stream threads makes Kafka Streams more robust
because it prevent a possibly unbounded increase of memory usage. If
users want to debug the dead stream threads they can monitor the number
of dead threads with the metric proposed in the KIP and they could
additionally log the metadata of the dying stream thread in the uncaught
exception handler. I do not think that there is need to keep dead stream
threads around.


2) I think it would help to write down some example user code in

exception

handler e.g. to illustrate how this would be implemented -- e.g. we know
that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or,

add/terminate

threads dynamically, but I want to see if we have listed all possible

call

paths like: a) a thread's handler logic to terminate another thread, b) a
thread handler to add new threads, etc are all appropriately supported
without deadlocks.



I added an example for an uncaught exception handler that adds a stream
thread to the KIP. Removing a stream thread in an uncaught exception
handler doesn't seem a common use case to me. Nevertheless, we need to
make sure that we do not run in a deadlock in that case. I will consider
that during the implementation and write tests to check for deadlocks.

Shutting down the Kafka Streams client from inside an uncaught exception
handler is outside the scope of this KIP. In the beginning it was part
of the KIP, but during the discussion it turned out that we can fix our
existing close() method to accomplish the shutdown from inside an
uncaught exception handler. But I completely agree with you that we need
to ensure that we do not run into a deadlock in this case.




Guozhang


On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax 

wrote:



I would prefer to not add a new method. It seems unnecessary.
`localThreadMetadata` does return all threads in all states(*) and thus
provides full insight.

(*) A thread in state DEAD could be returned as long as it's not removed
yet.

I don't see any advantage to pre-filter threads and to exclude threads
in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
started yet, it is still "alive" in a broader sense. For example, if a
user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
state CREATED, a user won't need to add 2 more threads -- there are
already 10 threads.

For PENDING_SHUTDOWN and scale in it would be different I guess, as the
proposal would be to filter them out right away. However, filtering them
seems actually not to be "correct", as a thread in 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-15 Thread Guozhang Wang
Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or would
it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna  wrote:

> Hi Guozhang,
>
> Thank you for your feedback. I replied inline.
>
> Best,
> Bruno
>
> On 09.09.20 23:43, Guozhang Wang wrote:
> > Hello Bruno,
> >
> > Finally got some time to review your KIP and the discussion thread now..
> a
> > few comments below:
> >
> > 1) I'm with Matthias about the newly added numberOfAliveStreamThreads
> v.s.
> > existing localThreadsMetadata: to me it seems we can always achieve the
> > first based on the second. It seems not worthy to provide some "syntax
> > sugar" to the API but just let users do the filtering themselves.
>
> I am not married to that method. I removed it.
>
> > Furthermore, I'm wondering what's the rationale behind removing the DEAD
> > threads from localThreadsMetadata()? Personally I feel returning all
> > threads, including those who are ever closed, either due to exception or
> > due to removeStreamThread, would be beneficial for debugging purposes, as
> > long as within the lifetime of an instance we expect the amount of such
> > dead threads will not increase linearly --- and if we agree with that,
> > maybe we can rename "removeStreamThread" to sth. like
> > "terminateStreamThread" indicating it is only terminated but not removed
> > --- and of course if users do not want to see those DEAD threads they can
> > always filter them out. I'm just proposing that we should still leave the
> > door open for those who want to check those ever terminated threads.
> >
>
> I actually think the number of dead stream threads might increase
> linearly. Assume users have a systematic error that continuously kills a
> stream thread and they blindly start a new stream thread in the uncaught
> exception handler. This scenario might be a mistake but if the
> systematic error does not occur at a high rate, it could also be a
> strategy to keep the application running during the investigation of the
> systematic error.
>
> IMO, removing dead stream threads makes Kafka Streams more robust
> because it prevent a possibly unbounded increase of memory usage. If
> users want to debug the dead stream threads they can monitor the number
> of dead threads with the metric proposed in the KIP and they could
> additionally log the metadata of the dying stream thread in the uncaught
> exception handler. I do not think that there is need to keep dead stream
> threads around.
>
> > 2) I think it would help to write down some example user code in
> exception
> > handler e.g. to illustrate how this would be implemented -- e.g. we know
> > that practically the handler need to maintain a "this" reference of the
> > instance anyways in order to shutdown the whole instance or,
> add/terminate
> > threads dynamically, but I want to see if we have listed all possible
> call
> > paths like: a) a thread's handler logic to terminate another thread, b) a
> > thread handler to add new threads, etc are all appropriately supported
> > without deadlocks.
> >
>
> I added an example for an uncaught exception handler that adds a stream
> thread to the KIP. Removing a stream thread in an uncaught exception
> handler doesn't seem a common use case to me. Nevertheless, we need to
> make sure that we do not run in a deadlock in that case. I will consider
> that during the implementation and write tests to check for deadlocks.
>
> Shutting down the Kafka Streams client from inside an uncaught exception
> handler is outside the scope of this KIP. In the beginning it was part
> of the KIP, but during the discussion it turned out that we can fix our
> existing close() method to accomplish the shutdown from inside an
> uncaught exception handler. But I completely agree with you that we need
> to ensure that we do not run into a deadlock in this case.
>
>
> >
> > Guozhang
> >
> >
> > On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax 
> wrote:
> >
> >> I would prefer to not add a new method. It seems unnecessary.
> >> `localThreadMetadata` does return all threads in all states(*) and thus
> >> provides full insight.
> >>
> >> (*) A thread in state DEAD could be returned as long as it's not removed
> >> yet.
> >>
> >> I don't see any advantage to pre-filter threads and to exclude threads
> >> in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
> >> started yet, it is still "alive" in a broader sense. For example, if a
> >> user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
> >> state CREATED, a user won't need to add 2 more threads -- there are
> >> already 10 threads.
> >>
> >> For PENDING_SHUTDOWN and scale in it would be different I guess, as the
> >> proposal would be to filter them out right away. However, filtering them
> >> seems actually not to be 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-15 Thread Bruno Cadonna

Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:

Hello Bruno,

Finally got some time to review your KIP and the discussion thread now.. a
few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s.
existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.


I am not married to that method. I removed it.


Furthermore, I'm wondering what's the rationale behind removing the DEAD
threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to exception or
due to removeStreamThread, would be beneficial for debugging purposes, as
long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not removed
--- and of course if users do not want to see those DEAD threads they can
always filter them out. I'm just proposing that we should still leave the
door open for those who want to check those ever terminated threads.



I actually think the number of dead stream threads might increase 
linearly. Assume users have a systematic error that continuously kills a 
stream thread and they blindly start a new stream thread in the uncaught 
exception handler. This scenario might be a mistake but if the 
systematic error does not occur at a high rate, it could also be a 
strategy to keep the application running during the investigation of the 
systematic error.


IMO, removing dead stream threads makes Kafka Streams more robust 
because it prevent a possibly unbounded increase of memory usage. If 
users want to debug the dead stream threads they can monitor the number 
of dead threads with the metric proposed in the KIP and they could 
additionally log the metadata of the dying stream thread in the uncaught 
exception handler. I do not think that there is need to keep dead stream 
threads around.



2) I think it would help to write down some example user code in exception
handler e.g. to illustrate how this would be implemented -- e.g. we know
that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or, add/terminate
threads dynamically, but I want to see if we have listed all possible call
paths like: a) a thread's handler logic to terminate another thread, b) a
thread handler to add new threads, etc are all appropriately supported
without deadlocks.



I added an example for an uncaught exception handler that adds a stream 
thread to the KIP. Removing a stream thread in an uncaught exception 
handler doesn't seem a common use case to me. Nevertheless, we need to 
make sure that we do not run in a deadlock in that case. I will consider 
that during the implementation and write tests to check for deadlocks.


Shutting down the Kafka Streams client from inside an uncaught exception 
handler is outside the scope of this KIP. In the beginning it was part 
of the KIP, but during the discussion it turned out that we can fix our 
existing close() method to accomplish the shutdown from inside an 
uncaught exception handler. But I completely agree with you that we need 
to ensure that we do not run into a deadlock in this case.





Guozhang


On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax  wrote:


I would prefer to not add a new method. It seems unnecessary.
`localThreadMetadata` does return all threads in all states(*) and thus
provides full insight.

(*) A thread in state DEAD could be returned as long as it's not removed
yet.

I don't see any advantage to pre-filter threads and to exclude threads
in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
started yet, it is still "alive" in a broader sense. For example, if a
user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
state CREATED, a user won't need to add 2 more threads -- there are
already 10 threads.

For PENDING_SHUTDOWN and scale in it would be different I guess, as the
proposal would be to filter them out right away. However, filtering them
seems actually not to be "correct", as a thread in PENDING_SHUTDOWN
might still process data and it's thus still "alive".

If there is still a need later to add a new method about "alive thread"
we can always add as a follow up -- removing things is much harder.

I also don't think that there is value in returning names of dead
threads, as we recycle names.


-Matthias


On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote:

I agree that the current behavior of localThreadsMetadata() does not seem
to match, but it seems like we will be forced to change it to only return
currently-alive threads. For one thing, we plan to recycle 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-09 Thread Guozhang Wang
Hello Bruno,

Finally got some time to review your KIP and the discussion thread now.. a
few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads v.s.
existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.
Furthermore, I'm wondering what's the rationale behind removing the DEAD
threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to exception or
due to removeStreamThread, would be beneficial for debugging purposes, as
long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not removed
--- and of course if users do not want to see those DEAD threads they can
always filter them out. I'm just proposing that we should still leave the
door open for those who want to check those ever terminated threads.

2) I think it would help to write down some example user code in exception
handler e.g. to illustrate how this would be implemented -- e.g. we know
that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or, add/terminate
threads dynamically, but I want to see if we have listed all possible call
paths like: a) a thread's handler logic to terminate another thread, b) a
thread handler to add new threads, etc are all appropriately supported
without deadlocks.


Guozhang


On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax  wrote:

> I would prefer to not add a new method. It seems unnecessary.
> `localThreadMetadata` does return all threads in all states(*) and thus
> provides full insight.
>
> (*) A thread in state DEAD could be returned as long as it's not removed
> yet.
>
> I don't see any advantage to pre-filter threads and to exclude threads
> in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
> started yet, it is still "alive" in a broader sense. For example, if a
> user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
> state CREATED, a user won't need to add 2 more threads -- there are
> already 10 threads.
>
> For PENDING_SHUTDOWN and scale in it would be different I guess, as the
> proposal would be to filter them out right away. However, filtering them
> seems actually not to be "correct", as a thread in PENDING_SHUTDOWN
> might still process data and it's thus still "alive".
>
> If there is still a need later to add a new method about "alive thread"
> we can always add as a follow up -- removing things is much harder.
>
> I also don't think that there is value in returning names of dead
> threads, as we recycle names.
>
>
> -Matthias
>
>
> On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote:
> > I agree that the current behavior of localThreadsMetadata() does not seem
> > to match, but it seems like we will be forced to change it to only return
> > currently-alive threads. For one thing, we plan to recycle old thread
> names.
> > It would be pretty confusing for a user to get two (or more)
> ThreadMetadata
> > objects returned with the same name, since AFAICT this is the only
> > distinguishing identifier of stream threads. I think we should enforce
> that
> > only live threads are returned by localThreadsMetadata(). Plus, as
> Matthias
> > pointed out, we plan to remove dead threads from the KafkaStreams client,
> > so still returning them in the metadata would be extremely odd.
> >
> > If we think that there might be some use case that requires knowing which
> > threads have died, we could consider adding a method that returns the
> > names of dead threads. But the only use case I can imagine would probably
> > be better served by a callback that gets invoked when the thread dies,
> which
> > we already have.
> >
> > On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Matthias and Sophie,
> >>
> >> I agree that localThreadsMetadata() can be used here. However,
> >> localThreadsMetadata() returns all stream threads irrespectively of
> >> their states. Alive stream threads are specified as being in one of the
> >> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and
> >> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of
> >> localThreadsMetadata(). I thought, it would be neat to have a method
> >> that hides this filtering and returns the number of alive stream
> >> threads, because that is the most basic information you might need to
> >> decide about adding or removing stream threads. For all more advanced
> >> use cases users should use localThreadsMetadata(). I am also happy with
> >> removing the method. WDYT?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 09.09.20 03:51, Matthias J. Sax wrote:
> >>> Currently 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-09 Thread Matthias J. Sax
I would prefer to not add a new method. It seems unnecessary.
`localThreadMetadata` does return all threads in all states(*) and thus
provides full insight.

(*) A thread in state DEAD could be returned as long as it's not removed
yet.

I don't see any advantage to pre-filter threads and to exclude threads
in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
started yet, it is still "alive" in a broader sense. For example, if a
user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
state CREATED, a user won't need to add 2 more threads -- there are
already 10 threads.

For PENDING_SHUTDOWN and scale in it would be different I guess, as the
proposal would be to filter them out right away. However, filtering them
seems actually not to be "correct", as a thread in PENDING_SHUTDOWN
might still process data and it's thus still "alive".

If there is still a need later to add a new method about "alive thread"
we can always add as a follow up -- removing things is much harder.

I also don't think that there is value in returning names of dead
threads, as we recycle names.


-Matthias


On 9/9/20 10:04 AM, Sophie Blee-Goldman wrote:
> I agree that the current behavior of localThreadsMetadata() does not seem
> to match, but it seems like we will be forced to change it to only return
> currently-alive threads. For one thing, we plan to recycle old thread names.
> It would be pretty confusing for a user to get two (or more) ThreadMetadata
> objects returned with the same name, since AFAICT this is the only
> distinguishing identifier of stream threads. I think we should enforce that
> only live threads are returned by localThreadsMetadata(). Plus, as Matthias
> pointed out, we plan to remove dead threads from the KafkaStreams client,
> so still returning them in the metadata would be extremely odd.
> 
> If we think that there might be some use case that requires knowing which
> threads have died, we could consider adding a method that returns the
> names of dead threads. But the only use case I can imagine would probably
> be better served by a callback that gets invoked when the thread dies, which
> we already have.
> 
> On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna  wrote:
> 
>> Hi Matthias and Sophie,
>>
>> I agree that localThreadsMetadata() can be used here. However,
>> localThreadsMetadata() returns all stream threads irrespectively of
>> their states. Alive stream threads are specified as being in one of the
>> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and
>> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of
>> localThreadsMetadata(). I thought, it would be neat to have a method
>> that hides this filtering and returns the number of alive stream
>> threads, because that is the most basic information you might need to
>> decide about adding or removing stream threads. For all more advanced
>> use cases users should use localThreadsMetadata(). I am also happy with
>> removing the method. WDYT?
>>
>> Best,
>> Bruno
>>
>> On 09.09.20 03:51, Matthias J. Sax wrote:
>>> Currently we, don't cleanup dead threads, but the KIP proposes to change
>>> this:
>>>
 Stream threads that are in state DEAD will be removed from the stream
>> threads of a Kafka Streams client.
>>>
>>>
>>> -Matthias
>>>
>>> On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:
 Ah, I forgot about localThreadsMetadata(). In that. case I agree,
>> there's
 no reason
 to introduce a new method when we can get both the names and number of
>> all
 running threads from this.

 I assume that we would update localThreadsMetadata to only return
>> currently
 alive threads as part of this KIP -- at a quick glance, it seems like we
 don't do
 any pruning of dead threads at the moment

 On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax 
>> wrote:

> I am not sure if we need a new method? There is already
> `localThreadsMetadata()`. What do we gain by adding a new one?
>
> Returning the thread's name (as `Optional`) for both add() and
> remove() is fine with me.
>
>
> -Matthias
>
> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
>> Sorry Bruno, I think I missed the end of your message with the
>> numberOfAliveStreamThreads()
>> proposal. I agree, that would be better than the alternatives I
>> listed.
>> That said:
>>
>>> They rather suggest that the method returns a list of handles to the
>> stream threads.
>>
>> I hadn't thought of that originally, but now that you mention it, this
>> might be a good idea.
>> I don't think we should return actual handles on the threads, but
>> maybe a
>> list of the thread
>> names rather than a single number of currently alive threads.
>>
>> Since we seem to think it would be difficult if not impossible to keep
>> track of the number
>> of running stream threads, we should apply the same reasoning to the
> names

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-09 Thread Sophie Blee-Goldman
I agree that the current behavior of localThreadsMetadata() does not seem
to match, but it seems like we will be forced to change it to only return
currently-alive threads. For one thing, we plan to recycle old thread names.
It would be pretty confusing for a user to get two (or more) ThreadMetadata
objects returned with the same name, since AFAICT this is the only
distinguishing identifier of stream threads. I think we should enforce that
only live threads are returned by localThreadsMetadata(). Plus, as Matthias
pointed out, we plan to remove dead threads from the KafkaStreams client,
so still returning them in the metadata would be extremely odd.

If we think that there might be some use case that requires knowing which
threads have died, we could consider adding a method that returns the
names of dead threads. But the only use case I can imagine would probably
be better served by a callback that gets invoked when the thread dies, which
we already have.

On Tue, Sep 8, 2020 at 11:46 PM Bruno Cadonna  wrote:

> Hi Matthias and Sophie,
>
> I agree that localThreadsMetadata() can be used here. However,
> localThreadsMetadata() returns all stream threads irrespectively of
> their states. Alive stream threads are specified as being in one of the
> following states: RUNNING, STARTING, PARTITIONS_REVOKED, and
> PARTITIONS_ASSIGNED. Hence, users would need to filter the result of
> localThreadsMetadata(). I thought, it would be neat to have a method
> that hides this filtering and returns the number of alive stream
> threads, because that is the most basic information you might need to
> decide about adding or removing stream threads. For all more advanced
> use cases users should use localThreadsMetadata(). I am also happy with
> removing the method. WDYT?
>
> Best,
> Bruno
>
> On 09.09.20 03:51, Matthias J. Sax wrote:
> > Currently we, don't cleanup dead threads, but the KIP proposes to change
> > this:
> >
> >> Stream threads that are in state DEAD will be removed from the stream
> threads of a Kafka Streams client.
> >
> >
> > -Matthias
> >
> > On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:
> >> Ah, I forgot about localThreadsMetadata(). In that. case I agree,
> there's
> >> no reason
> >> to introduce a new method when we can get both the names and number of
> all
> >> running threads from this.
> >>
> >> I assume that we would update localThreadsMetadata to only return
> currently
> >> alive threads as part of this KIP -- at a quick glance, it seems like we
> >> don't do
> >> any pruning of dead threads at the moment
> >>
> >> On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax 
> wrote:
> >>
> >>> I am not sure if we need a new method? There is already
> >>> `localThreadsMetadata()`. What do we gain by adding a new one?
> >>>
> >>> Returning the thread's name (as `Optional`) for both add() and
> >>> remove() is fine with me.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
>  Sorry Bruno, I think I missed the end of your message with the
>  numberOfAliveStreamThreads()
>  proposal. I agree, that would be better than the alternatives I
> listed.
>  That said:
> 
> > They rather suggest that the method returns a list of handles to the
>  stream threads.
> 
>  I hadn't thought of that originally, but now that you mention it, this
>  might be a good idea.
>  I don't think we should return actual handles on the threads, but
> maybe a
>  list of the thread
>  names rather than a single number of currently alive threads.
> 
>  Since we seem to think it would be difficult if not impossible to keep
>  track of the number
>  of running stream threads, we should apply the same reasoning to the
> >>> names
>  and not
>  assume the user can/will keep track of every thread returned by
>  addStreamThread() or
>  removeStreamThread(). Users should generally take any required action
>  immediately
>  after adding/removing the thread -- eg deregistering the thread
> metrics
> >>> --
>  but it might
>  still be useful to provide a convenience method listing all of the
> >>> current
>  threads
> 
>  And of course you could still get the number of threads easily by
> >>> invoking
>  size() on the
>  returned list (or ordered set?).
> 
>  On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna 
> >>> wrote:
> 
> > Thank you again for the feedback Sophie!
> >
> > As I tried to point out in my previous e-mail, removing a stream
> thread
> > from a Kafka Streams client that does not have alive stream threads
> is
> > nothing exceptional for the client per se. However, it can become
> > exceptional within the context of the user. For example, if users
> want
> > to remove a stream thread from a client without alive stream threads
> > because one if their metrics say so, then this is exceptional in the
> > context of that user metric not in 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-09 Thread Bruno Cadonna

Hi Matthias and Sophie,

I agree that localThreadsMetadata() can be used here. However, 
localThreadsMetadata() returns all stream threads irrespectively of 
their states. Alive stream threads are specified as being in one of the 
following states: RUNNING, STARTING, PARTITIONS_REVOKED, and 
PARTITIONS_ASSIGNED. Hence, users would need to filter the result of 
localThreadsMetadata(). I thought, it would be neat to have a method 
that hides this filtering and returns the number of alive stream 
threads, because that is the most basic information you might need to 
decide about adding or removing stream threads. For all more advanced 
use cases users should use localThreadsMetadata(). I am also happy with 
removing the method. WDYT?


Best,
Bruno

On 09.09.20 03:51, Matthias J. Sax wrote:

Currently we, don't cleanup dead threads, but the KIP proposes to change
this:


Stream threads that are in state DEAD will be removed from the stream threads 
of a Kafka Streams client.



-Matthias

On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:

Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's
no reason
to introduce a new method when we can get both the names and number of all
running threads from this.

I assume that we would update localThreadsMetadata to only return currently
alive threads as part of this KIP -- at a quick glance, it seems like we
don't do
any pruning of dead threads at the moment

On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax  wrote:


I am not sure if we need a new method? There is already
`localThreadsMetadata()`. What do we gain by adding a new one?

Returning the thread's name (as `Optional`) for both add() and
remove() is fine with me.


-Matthias

On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:

Sorry Bruno, I think I missed the end of your message with the
numberOfAliveStreamThreads()
proposal. I agree, that would be better than the alternatives I listed.
That said:


They rather suggest that the method returns a list of handles to the

stream threads.

I hadn't thought of that originally, but now that you mention it, this
might be a good idea.
I don't think we should return actual handles on the threads, but maybe a
list of the thread
names rather than a single number of currently alive threads.

Since we seem to think it would be difficult if not impossible to keep
track of the number
of running stream threads, we should apply the same reasoning to the

names

and not
assume the user can/will keep track of every thread returned by
addStreamThread() or
removeStreamThread(). Users should generally take any required action
immediately
after adding/removing the thread -- eg deregistering the thread metrics

--

but it might
still be useful to provide a convenience method listing all of the

current

threads

And of course you could still get the number of threads easily by

invoking

size() on the
returned list (or ordered set?).

On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna 

wrote:



Thank you again for the feedback Sophie!

As I tried to point out in my previous e-mail, removing a stream thread
from a Kafka Streams client that does not have alive stream threads is
nothing exceptional for the client per se. However, it can become
exceptional within the context of the user. For example, if users want
to remove a stream thread from a client without alive stream threads
because one if their metrics say so, then this is exceptional in the
context of that user metric not in the context of the Kafka Streams
client. In that case, users should throw an exception and handle it.

Regarding returning null, I do not like to return null because from a
development point of view there is no distinction between returning null
because we have a bug in the code or returning null because there are no
alive stream threads. Additionally, Optional makes it more
explicit that the result could also be empty.

Thank you for the alternative method names! However, with the names you
propose it is not immediately clear that the method returns an amount of
stream threads. They rather suggest that the method returns a list of
handles to the stream threads. I chose to use "aliveStreamThreads" to be
consistent with the client-level metric "alive-stream-threads" which
reports the same number of stream threads that
numberOfAliveStreamThreads() should report. If others also think that
the proposed name in the KIP is too clumsy, I am open to rename it,

though.


Best,
Bruno


On 08.09.20 20:12, Sophie Blee-Goldman wrote:

it's never a good sign when the discussion moves into the vote thread


Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads

strikes

again.
Thanks for redirecting me Bruno

I suppose it's unfair to expect the callers to keep perfect track of

the

current
   number of stream threads, but it also seems like you shouldn't be

calling

removeStreamThread() when there are no threads left. Either you're just
haphazardly removing threads and could unintentionally slip 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Matthias J. Sax
Currently we, don't cleanup dead threads, but the KIP proposes to change
this:

> Stream threads that are in state DEAD will be removed from the stream threads 
> of a Kafka Streams client.


-Matthias

On 9/8/20 2:37 PM, Sophie Blee-Goldman wrote:
> Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's
> no reason
> to introduce a new method when we can get both the names and number of all
> running threads from this.
> 
> I assume that we would update localThreadsMetadata to only return currently
> alive threads as part of this KIP -- at a quick glance, it seems like we
> don't do
> any pruning of dead threads at the moment
> 
> On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax  wrote:
> 
>> I am not sure if we need a new method? There is already
>> `localThreadsMetadata()`. What do we gain by adding a new one?
>>
>> Returning the thread's name (as `Optional`) for both add() and
>> remove() is fine with me.
>>
>>
>> -Matthias
>>
>> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
>>> Sorry Bruno, I think I missed the end of your message with the
>>> numberOfAliveStreamThreads()
>>> proposal. I agree, that would be better than the alternatives I listed.
>>> That said:
>>>
 They rather suggest that the method returns a list of handles to the
>>> stream threads.
>>>
>>> I hadn't thought of that originally, but now that you mention it, this
>>> might be a good idea.
>>> I don't think we should return actual handles on the threads, but maybe a
>>> list of the thread
>>> names rather than a single number of currently alive threads.
>>>
>>> Since we seem to think it would be difficult if not impossible to keep
>>> track of the number
>>> of running stream threads, we should apply the same reasoning to the
>> names
>>> and not
>>> assume the user can/will keep track of every thread returned by
>>> addStreamThread() or
>>> removeStreamThread(). Users should generally take any required action
>>> immediately
>>> after adding/removing the thread -- eg deregistering the thread metrics
>> --
>>> but it might
>>> still be useful to provide a convenience method listing all of the
>> current
>>> threads
>>>
>>> And of course you could still get the number of threads easily by
>> invoking
>>> size() on the
>>> returned list (or ordered set?).
>>>
>>> On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna 
>> wrote:
>>>
 Thank you again for the feedback Sophie!

 As I tried to point out in my previous e-mail, removing a stream thread
 from a Kafka Streams client that does not have alive stream threads is
 nothing exceptional for the client per se. However, it can become
 exceptional within the context of the user. For example, if users want
 to remove a stream thread from a client without alive stream threads
 because one if their metrics say so, then this is exceptional in the
 context of that user metric not in the context of the Kafka Streams
 client. In that case, users should throw an exception and handle it.

 Regarding returning null, I do not like to return null because from a
 development point of view there is no distinction between returning null
 because we have a bug in the code or returning null because there are no
 alive stream threads. Additionally, Optional makes it more
 explicit that the result could also be empty.

 Thank you for the alternative method names! However, with the names you
 propose it is not immediately clear that the method returns an amount of
 stream threads. They rather suggest that the method returns a list of
 handles to the stream threads. I chose to use "aliveStreamThreads" to be
 consistent with the client-level metric "alive-stream-threads" which
 reports the same number of stream threads that
 numberOfAliveStreamThreads() should report. If others also think that
 the proposed name in the KIP is too clumsy, I am open to rename it,
>> though.

 Best,
 Bruno


 On 08.09.20 20:12, Sophie Blee-Goldman wrote:
>> it's never a good sign when the discussion moves into the vote thread
>
> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads
 strikes
> again.
> Thanks for redirecting me Bruno
>
> I suppose it's unfair to expect the callers to keep perfect track of
>> the
> current
>   number of stream threads, but it also seems like you shouldn't be
 calling
> removeStreamThread() when there are no threads left. Either you're just
> haphazardly removing threads and could unintentionally slip into a
>> state
 of
> no
> running threads without realizing it, or more realistically, you're
> carefully
> removing threads based on some metric(s) that convey whether the system
 is
> over or under-provisioned. If your metrics say you're over-provisioned
 but
> there's
> not one thread running, well, that certainly sounds exceptional to me.
>> Or
> you 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Sophie Blee-Goldman
Ah, I forgot about localThreadsMetadata(). In that. case I agree, there's
no reason
to introduce a new method when we can get both the names and number of all
running threads from this.

I assume that we would update localThreadsMetadata to only return currently
alive threads as part of this KIP -- at a quick glance, it seems like we
don't do
any pruning of dead threads at the moment

On Tue, Sep 8, 2020 at 1:58 PM Matthias J. Sax  wrote:

> I am not sure if we need a new method? There is already
> `localThreadsMetadata()`. What do we gain by adding a new one?
>
> Returning the thread's name (as `Optional`) for both add() and
> remove() is fine with me.
>
>
> -Matthias
>
> On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
> > Sorry Bruno, I think I missed the end of your message with the
> > numberOfAliveStreamThreads()
> > proposal. I agree, that would be better than the alternatives I listed.
> > That said:
> >
> >> They rather suggest that the method returns a list of handles to the
> > stream threads.
> >
> > I hadn't thought of that originally, but now that you mention it, this
> > might be a good idea.
> > I don't think we should return actual handles on the threads, but maybe a
> > list of the thread
> > names rather than a single number of currently alive threads.
> >
> > Since we seem to think it would be difficult if not impossible to keep
> > track of the number
> > of running stream threads, we should apply the same reasoning to the
> names
> > and not
> > assume the user can/will keep track of every thread returned by
> > addStreamThread() or
> > removeStreamThread(). Users should generally take any required action
> > immediately
> > after adding/removing the thread -- eg deregistering the thread metrics
> --
> > but it might
> > still be useful to provide a convenience method listing all of the
> current
> > threads
> >
> > And of course you could still get the number of threads easily by
> invoking
> > size() on the
> > returned list (or ordered set?).
> >
> > On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna 
> wrote:
> >
> >> Thank you again for the feedback Sophie!
> >>
> >> As I tried to point out in my previous e-mail, removing a stream thread
> >> from a Kafka Streams client that does not have alive stream threads is
> >> nothing exceptional for the client per se. However, it can become
> >> exceptional within the context of the user. For example, if users want
> >> to remove a stream thread from a client without alive stream threads
> >> because one if their metrics say so, then this is exceptional in the
> >> context of that user metric not in the context of the Kafka Streams
> >> client. In that case, users should throw an exception and handle it.
> >>
> >> Regarding returning null, I do not like to return null because from a
> >> development point of view there is no distinction between returning null
> >> because we have a bug in the code or returning null because there are no
> >> alive stream threads. Additionally, Optional makes it more
> >> explicit that the result could also be empty.
> >>
> >> Thank you for the alternative method names! However, with the names you
> >> propose it is not immediately clear that the method returns an amount of
> >> stream threads. They rather suggest that the method returns a list of
> >> handles to the stream threads. I chose to use "aliveStreamThreads" to be
> >> consistent with the client-level metric "alive-stream-threads" which
> >> reports the same number of stream threads that
> >> numberOfAliveStreamThreads() should report. If others also think that
> >> the proposed name in the KIP is too clumsy, I am open to rename it,
> though.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On 08.09.20 20:12, Sophie Blee-Goldman wrote:
>  it's never a good sign when the discussion moves into the vote thread
> >>>
> >>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads
> >> strikes
> >>> again.
> >>> Thanks for redirecting me Bruno
> >>>
> >>> I suppose it's unfair to expect the callers to keep perfect track of
> the
> >>> current
> >>>   number of stream threads, but it also seems like you shouldn't be
> >> calling
> >>> removeStreamThread() when there are no threads left. Either you're just
> >>> haphazardly removing threads and could unintentionally slip into a
> state
> >> of
> >>> no
> >>> running threads without realizing it, or more realistically, you're
> >>> carefully
> >>> removing threads based on some metric(s) that convey whether the system
> >> is
> >>> over or under-provisioned. If your metrics say you're over-provisioned
> >> but
> >>> there's
> >>> not one thread running, well, that certainly sounds exceptional to me.
> Or
> >>> you might
> >>> be right in that the cluster is over-provisioned but have just been
> >>> directing the
> >>> removeStreamThread() and addStreamThread() calls to instances at
> random,
> >> and
> >>> end up with one massive instance and one with no threads at all. Again,
> >>> this
> >>> probably 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Matthias J. Sax
I am not sure if we need a new method? There is already
`localThreadsMetadata()`. What do we gain by adding a new one?

Returning the thread's name (as `Optional`) for both add() and
remove() is fine with me.


-Matthias

On 9/8/20 12:58 PM, Sophie Blee-Goldman wrote:
> Sorry Bruno, I think I missed the end of your message with the
> numberOfAliveStreamThreads()
> proposal. I agree, that would be better than the alternatives I listed.
> That said:
> 
>> They rather suggest that the method returns a list of handles to the
> stream threads.
> 
> I hadn't thought of that originally, but now that you mention it, this
> might be a good idea.
> I don't think we should return actual handles on the threads, but maybe a
> list of the thread
> names rather than a single number of currently alive threads.
> 
> Since we seem to think it would be difficult if not impossible to keep
> track of the number
> of running stream threads, we should apply the same reasoning to the names
> and not
> assume the user can/will keep track of every thread returned by
> addStreamThread() or
> removeStreamThread(). Users should generally take any required action
> immediately
> after adding/removing the thread -- eg deregistering the thread metrics --
> but it might
> still be useful to provide a convenience method listing all of the current
> threads
> 
> And of course you could still get the number of threads easily by invoking
> size() on the
> returned list (or ordered set?).
> 
> On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna  wrote:
> 
>> Thank you again for the feedback Sophie!
>>
>> As I tried to point out in my previous e-mail, removing a stream thread
>> from a Kafka Streams client that does not have alive stream threads is
>> nothing exceptional for the client per se. However, it can become
>> exceptional within the context of the user. For example, if users want
>> to remove a stream thread from a client without alive stream threads
>> because one if their metrics say so, then this is exceptional in the
>> context of that user metric not in the context of the Kafka Streams
>> client. In that case, users should throw an exception and handle it.
>>
>> Regarding returning null, I do not like to return null because from a
>> development point of view there is no distinction between returning null
>> because we have a bug in the code or returning null because there are no
>> alive stream threads. Additionally, Optional makes it more
>> explicit that the result could also be empty.
>>
>> Thank you for the alternative method names! However, with the names you
>> propose it is not immediately clear that the method returns an amount of
>> stream threads. They rather suggest that the method returns a list of
>> handles to the stream threads. I chose to use "aliveStreamThreads" to be
>> consistent with the client-level metric "alive-stream-threads" which
>> reports the same number of stream threads that
>> numberOfAliveStreamThreads() should report. If others also think that
>> the proposed name in the KIP is too clumsy, I am open to rename it, though.
>>
>> Best,
>> Bruno
>>
>>
>> On 08.09.20 20:12, Sophie Blee-Goldman wrote:
 it's never a good sign when the discussion moves into the vote thread
>>>
>>> Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads
>> strikes
>>> again.
>>> Thanks for redirecting me Bruno
>>>
>>> I suppose it's unfair to expect the callers to keep perfect track of the
>>> current
>>>   number of stream threads, but it also seems like you shouldn't be
>> calling
>>> removeStreamThread() when there are no threads left. Either you're just
>>> haphazardly removing threads and could unintentionally slip into a state
>> of
>>> no
>>> running threads without realizing it, or more realistically, you're
>>> carefully
>>> removing threads based on some metric(s) that convey whether the system
>> is
>>> over or under-provisioned. If your metrics say you're over-provisioned
>> but
>>> there's
>>> not one thread running, well, that certainly sounds exceptional to me. Or
>>> you might
>>> be right in that the cluster is over-provisioned but have just been
>>> directing the
>>> removeStreamThread() and addStreamThread() calls to instances at random,
>> and
>>> end up with one massive instance and one with no threads at all. Again,
>>> this
>>> probably merits some human intervention (or system redesign)
>>>
>>> That said, I don't think there's any real harm to just returning null in
>>> this case, but I hope
>>> that users would pay attention to this since it seems likely to indicate
>>> something has gone
>>> seriously wrong. I suppose Optional would be a reasonable
>>> compromise.
>>>
>>> As for the method name, what about activeStreamThreads() or
>>> liveStreamThreads() ?
>>>
>>> On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna  wrote:
>>>
 Hi John,

 I agree with you except for checking null. I would rather prefer to use
 Optional as the return type to both methods.

 I changed the 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Sophie Blee-Goldman
Sorry Bruno, I think I missed the end of your message with the
numberOfAliveStreamThreads()
proposal. I agree, that would be better than the alternatives I listed.
That said:

> They rather suggest that the method returns a list of handles to the
stream threads.

I hadn't thought of that originally, but now that you mention it, this
might be a good idea.
I don't think we should return actual handles on the threads, but maybe a
list of the thread
names rather than a single number of currently alive threads.

Since we seem to think it would be difficult if not impossible to keep
track of the number
of running stream threads, we should apply the same reasoning to the names
and not
assume the user can/will keep track of every thread returned by
addStreamThread() or
removeStreamThread(). Users should generally take any required action
immediately
after adding/removing the thread -- eg deregistering the thread metrics --
but it might
still be useful to provide a convenience method listing all of the current
threads

And of course you could still get the number of threads easily by invoking
size() on the
returned list (or ordered set?).

On Tue, Sep 8, 2020 at 12:16 PM Bruno Cadonna  wrote:

> Thank you again for the feedback Sophie!
>
> As I tried to point out in my previous e-mail, removing a stream thread
> from a Kafka Streams client that does not have alive stream threads is
> nothing exceptional for the client per se. However, it can become
> exceptional within the context of the user. For example, if users want
> to remove a stream thread from a client without alive stream threads
> because one if their metrics say so, then this is exceptional in the
> context of that user metric not in the context of the Kafka Streams
> client. In that case, users should throw an exception and handle it.
>
> Regarding returning null, I do not like to return null because from a
> development point of view there is no distinction between returning null
> because we have a bug in the code or returning null because there are no
> alive stream threads. Additionally, Optional makes it more
> explicit that the result could also be empty.
>
> Thank you for the alternative method names! However, with the names you
> propose it is not immediately clear that the method returns an amount of
> stream threads. They rather suggest that the method returns a list of
> handles to the stream threads. I chose to use "aliveStreamThreads" to be
> consistent with the client-level metric "alive-stream-threads" which
> reports the same number of stream threads that
> numberOfAliveStreamThreads() should report. If others also think that
> the proposed name in the KIP is too clumsy, I am open to rename it, though.
>
> Best,
> Bruno
>
>
> On 08.09.20 20:12, Sophie Blee-Goldman wrote:
> >> it's never a good sign when the discussion moves into the vote thread
> >
> > Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads
> strikes
> > again.
> > Thanks for redirecting me Bruno
> >
> > I suppose it's unfair to expect the callers to keep perfect track of the
> > current
> >   number of stream threads, but it also seems like you shouldn't be
> calling
> > removeStreamThread() when there are no threads left. Either you're just
> > haphazardly removing threads and could unintentionally slip into a state
> of
> > no
> > running threads without realizing it, or more realistically, you're
> > carefully
> > removing threads based on some metric(s) that convey whether the system
> is
> > over or under-provisioned. If your metrics say you're over-provisioned
> but
> > there's
> > not one thread running, well, that certainly sounds exceptional to me. Or
> > you might
> > be right in that the cluster is over-provisioned but have just been
> > directing the
> > removeStreamThread() and addStreamThread() calls to instances at random,
> and
> > end up with one massive instance and one with no threads at all. Again,
> > this
> > probably merits some human intervention (or system redesign)
> >
> > That said, I don't think there's any real harm to just returning null in
> > this case, but I hope
> > that users would pay attention to this since it seems likely to indicate
> > something has gone
> > seriously wrong. I suppose Optional would be a reasonable
> > compromise.
> >
> > As for the method name, what about activeStreamThreads() or
> > liveStreamThreads() ?
> >
> > On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna  wrote:
> >
> >> Hi John,
> >>
> >> I agree with you except for checking null. I would rather prefer to use
> >> Optional as the return type to both methods.
> >>
> >> I changed the subject from [VOTE] to [DISCUSS] so that we can follow up
> >> in the discussion thread.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 04.09.20 23:12, John Roesler wrote:
> >>> Hi Sophie,
> >>>
> >>> Uh, oh, it's never a good sign when the discussion moves
> >>> into the vote thread :)
> >>>
> >>> I agree with you, it seems like a good touch for
> >>> removeStreamThread() 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Bruno Cadonna

Thank you again for the feedback Sophie!

As I tried to point out in my previous e-mail, removing a stream thread 
from a Kafka Streams client that does not have alive stream threads is 
nothing exceptional for the client per se. However, it can become 
exceptional within the context of the user. For example, if users want 
to remove a stream thread from a client without alive stream threads 
because one if their metrics say so, then this is exceptional in the 
context of that user metric not in the context of the Kafka Streams 
client. In that case, users should throw an exception and handle it.


Regarding returning null, I do not like to return null because from a 
development point of view there is no distinction between returning null 
because we have a bug in the code or returning null because there are no 
alive stream threads. Additionally, Optional makes it more 
explicit that the result could also be empty.


Thank you for the alternative method names! However, with the names you 
propose it is not immediately clear that the method returns an amount of 
stream threads. They rather suggest that the method returns a list of 
handles to the stream threads. I chose to use "aliveStreamThreads" to be 
consistent with the client-level metric "alive-stream-threads" which 
reports the same number of stream threads that 
numberOfAliveStreamThreads() should report. If others also think that 
the proposed name in the KIP is too clumsy, I am open to rename it, though.


Best,
Bruno


On 08.09.20 20:12, Sophie Blee-Goldman wrote:

it's never a good sign when the discussion moves into the vote thread


Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes
again.
Thanks for redirecting me Bruno

I suppose it's unfair to expect the callers to keep perfect track of the
current
  number of stream threads, but it also seems like you shouldn't be calling
removeStreamThread() when there are no threads left. Either you're just
haphazardly removing threads and could unintentionally slip into a state of
no
running threads without realizing it, or more realistically, you're
carefully
removing threads based on some metric(s) that convey whether the system is
over or under-provisioned. If your metrics say you're over-provisioned but
there's
not one thread running, well, that certainly sounds exceptional to me. Or
you might
be right in that the cluster is over-provisioned but have just been
directing the
removeStreamThread() and addStreamThread() calls to instances at random, and
end up with one massive instance and one with no threads at all. Again,
this
probably merits some human intervention (or system redesign)

That said, I don't think there's any real harm to just returning null in
this case, but I hope
that users would pay attention to this since it seems likely to indicate
something has gone
seriously wrong. I suppose Optional would be a reasonable
compromise.

As for the method name, what about activeStreamThreads() or
liveStreamThreads() ?

On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna  wrote:


Hi John,

I agree with you except for checking null. I would rather prefer to use
Optional as the return type to both methods.

I changed the subject from [VOTE] to [DISCUSS] so that we can follow up
in the discussion thread.

Best,
Bruno

On 04.09.20 23:12, John Roesler wrote:

Hi Sophie,

Uh, oh, it's never a good sign when the discussion moves
into the vote thread :)

I agree with you, it seems like a good touch for
removeStreamThread() to return the name of the thread that
got removed, rather than a boolean flag. Maybe the return
value would be `null` if there is no thread to remove.

If we go that way, I'd suggest that addStreamThread() also
return the name of the newly created thread, or null if no
thread can be created right now.

I'm not completely sure if I think that callers of this
method would know exactly how many threads there are. Sure,
if a human being is sitting there looking at the metrics or
logs and decides to call the method, it would work out, but
I'd expect this kind of method to find its way into
automated tooling that reacts to things like current system
load or resource saturation. Those kinds of toolchains often
are part of a distributed system, and it's probably not that
easy to guarantee that the thread count they observe is
fully consistent with the number of threads that are
actually running. Therefore, an in-situ `int
numStreamThreads()` method might not be a bad idea. Then
again, it seems sort of optional. A caller can catch an
exception or react to a `null` return value just the same
either way. Having both add/remove methods behave similarly
is probably more valuable.

Thanks,
-John


On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
wrote:

Hey, sorry for the late reply, I just have one minor suggestion. Since

we

don't
make any guarantees about which thread gets removed or allow the user to
specify, I think we should return either the index or full name of 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-08 Thread Sophie Blee-Goldman
> it's never a good sign when the discussion moves into the vote thread

Hah, sorry, the gmail consolidation of [VOTE] and [DISCUSS] threads strikes
again.
Thanks for redirecting me Bruno

I suppose it's unfair to expect the callers to keep perfect track of the
current
 number of stream threads, but it also seems like you shouldn't be calling
removeStreamThread() when there are no threads left. Either you're just
haphazardly removing threads and could unintentionally slip into a state of
no
running threads without realizing it, or more realistically, you're
carefully
removing threads based on some metric(s) that convey whether the system is
over or under-provisioned. If your metrics say you're over-provisioned but
there's
not one thread running, well, that certainly sounds exceptional to me. Or
you might
be right in that the cluster is over-provisioned but have just been
directing the
removeStreamThread() and addStreamThread() calls to instances at random, and
end up with one massive instance and one with no threads at all. Again,
this
probably merits some human intervention (or system redesign)

That said, I don't think there's any real harm to just returning null in
this case, but I hope
that users would pay attention to this since it seems likely to indicate
something has gone
seriously wrong. I suppose Optional would be a reasonable
compromise.

As for the method name, what about activeStreamThreads() or
liveStreamThreads() ?

On Mon, Sep 7, 2020 at 1:45 AM Bruno Cadonna  wrote:

> Hi John,
>
> I agree with you except for checking null. I would rather prefer to use
> Optional as the return type to both methods.
>
> I changed the subject from [VOTE] to [DISCUSS] so that we can follow up
> in the discussion thread.
>
> Best,
> Bruno
>
> On 04.09.20 23:12, John Roesler wrote:
> > Hi Sophie,
> >
> > Uh, oh, it's never a good sign when the discussion moves
> > into the vote thread :)
> >
> > I agree with you, it seems like a good touch for
> > removeStreamThread() to return the name of the thread that
> > got removed, rather than a boolean flag. Maybe the return
> > value would be `null` if there is no thread to remove.
> >
> > If we go that way, I'd suggest that addStreamThread() also
> > return the name of the newly created thread, or null if no
> > thread can be created right now.
> >
> > I'm not completely sure if I think that callers of this
> > method would know exactly how many threads there are. Sure,
> > if a human being is sitting there looking at the metrics or
> > logs and decides to call the method, it would work out, but
> > I'd expect this kind of method to find its way into
> > automated tooling that reacts to things like current system
> > load or resource saturation. Those kinds of toolchains often
> > are part of a distributed system, and it's probably not that
> > easy to guarantee that the thread count they observe is
> > fully consistent with the number of threads that are
> > actually running. Therefore, an in-situ `int
> > numStreamThreads()` method might not be a bad idea. Then
> > again, it seems sort of optional. A caller can catch an
> > exception or react to a `null` return value just the same
> > either way. Having both add/remove methods behave similarly
> > is probably more valuable.
> >
> > Thanks,
> > -John
> >
> >
> > On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
> > wrote:
> >> Hey, sorry for the late reply, I just have one minor suggestion. Since
> we
> >> don't
> >> make any guarantees about which thread gets removed or allow the user to
> >> specify, I think we should return either the index or full name of the
> >> thread
> >> that does get removed by removeThread().
> >>
> >> I know you just updated the KIP to return true/false if there
> are/aren't any
> >> threads to be removed, but I think this would be more appropriate as an
> >> exception than as a return type. I think it's reasonable to expect
> users to
> >> have some sense to how many threads are remaining, and not try to remove
> >> a thread when there is none left. To me, that indicates something wrong
> >> with the user application code and should be treated as an exceptional
> case.
> >> I don't think the same code clarify argument applies here as to the
> >> addStreamThread() case, as there's no reason for an application to be
> >> looping and retrying removeStreamThread()  since if that fails, it's
> because
> >> there are no threads left and thus it will continue to always fail. And
> if
> >> the
> >> user actually wants to shut down all threads, they should just close the
> >> whole application rather than call removeStreamThread() in a loop.
> >>
> >> While I generally think it should be straightforward for users to track
> how
> >> many stream threads they have running, maybe it would be nice to add
> >> a small utility method that does this for them. Something like
> >>
> >> // Returns the number of currently alive threads
> >> boolean runningStreamThreads();
> >>
> >> On Thu, Sep 3, 2020 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-07 Thread Bruno Cadonna

Hi Sophie,

Thanks for your feedback!

I replied inline.

I changed the subject from [VOTE] to [DISCUSS] so that we can follow up 
in the discussion thread.



Best,
Bruno

On 03.09.20 21:15, Sophie Blee-Goldman wrote:

Hey, sorry for the late reply, I just have one minor suggestion. Since we
don't
make any guarantees about which thread gets removed or allow the user to
specify, I think we should return either the index or full name of the
thread
that does get removed by removeThread().



I guess that could be good idea to allow users to log which stream 
thread was removed and why. Did you have another use case in mind?



I know you just updated the KIP to return true/false if there are/aren't any
threads to be removed, but I think this would be more appropriate as an
exception than as a return type. I think it's reasonable to expect users to
have some sense to how many threads are remaining, and not try to remove
a thread when there is none left. To me, that indicates something wrong
with the user application code and should be treated as an exceptional case.
I don't think the same code clarify argument applies here as to the
addStreamThread() case, as there's no reason for an application to be
looping and retrying removeStreamThread()  since if that fails, it's because
there are no threads left and thus it will continue to always fail. And if
the
user actually wants to shut down all threads, they should just close the
whole application rather than call removeStreamThread() in a loop.



I do not agree that removing a stream thread from a client that does not 
have any stream threads is exceptional -- at least not for the Kafka 
Streams client. It may be for the caller, but then the caller should 
throw an exception.


It is not true that removeStreamThread() will always fail once it starts 
to fail, because a stream thread could be added between two calls to 
removeStreamThread().


I can imagine that users might also want to keep around a running Kafka 
Streams client without running stream threads to be able to start new 
stream threads faster, especially when a global table is involved which 
would still be updated also without running stream threads.




While I generally think it should be straightforward for users to track how
many stream threads they have running, maybe it would be nice to add
a small utility method that does this for them. Something like

// Returns the number of currently alive threads
boolean runningStreamThreads();



I am not completely against this. I would just not call it 
runningStreamThreads() because that could be misunderstood as returning 
handlers to stream threads in state running.



On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax  wrote:


+1 (binding)

On 9/3/20 6:16 AM, Bruno Cadonna wrote:

Hi,

I would like to start the voting on KIP-663 that proposes to add methods
to the Kafka Streams client to add and remove stream threads during
execution.



https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads



Best,
Bruno







Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-07 Thread Bruno Cadonna

Hi John,

I agree with you except for checking null. I would rather prefer to use 
Optional as the return type to both methods.


I changed the subject from [VOTE] to [DISCUSS] so that we can follow up 
in the discussion thread.


Best,
Bruno

On 04.09.20 23:12, John Roesler wrote:

Hi Sophie,

Uh, oh, it's never a good sign when the discussion moves
into the vote thread :)

I agree with you, it seems like a good touch for
removeStreamThread() to return the name of the thread that
got removed, rather than a boolean flag. Maybe the return
value would be `null` if there is no thread to remove.

If we go that way, I'd suggest that addStreamThread() also
return the name of the newly created thread, or null if no
thread can be created right now.

I'm not completely sure if I think that callers of this
method would know exactly how many threads there are. Sure,
if a human being is sitting there looking at the metrics or
logs and decides to call the method, it would work out, but
I'd expect this kind of method to find its way into
automated tooling that reacts to things like current system
load or resource saturation. Those kinds of toolchains often
are part of a distributed system, and it's probably not that
easy to guarantee that the thread count they observe is
fully consistent with the number of threads that are
actually running. Therefore, an in-situ `int
numStreamThreads()` method might not be a bad idea. Then
again, it seems sort of optional. A caller can catch an
exception or react to a `null` return value just the same
either way. Having both add/remove methods behave similarly
is probably more valuable.

Thanks,
-John


On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
wrote:

Hey, sorry for the late reply, I just have one minor suggestion. Since we
don't
make any guarantees about which thread gets removed or allow the user to
specify, I think we should return either the index or full name of the
thread
that does get removed by removeThread().

I know you just updated the KIP to return true/false if there are/aren't any
threads to be removed, but I think this would be more appropriate as an
exception than as a return type. I think it's reasonable to expect users to
have some sense to how many threads are remaining, and not try to remove
a thread when there is none left. To me, that indicates something wrong
with the user application code and should be treated as an exceptional case.
I don't think the same code clarify argument applies here as to the
addStreamThread() case, as there's no reason for an application to be
looping and retrying removeStreamThread()  since if that fails, it's because
there are no threads left and thus it will continue to always fail. And if
the
user actually wants to shut down all threads, they should just close the
whole application rather than call removeStreamThread() in a loop.

While I generally think it should be straightforward for users to track how
many stream threads they have running, maybe it would be nice to add
a small utility method that does this for them. Something like

// Returns the number of currently alive threads
boolean runningStreamThreads();

On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax  wrote:


+1 (binding)

On 9/3/20 6:16 AM, Bruno Cadonna wrote:

Hi,

I would like to start the voting on KIP-663 that proposes to add methods
to the Kafka Streams client to add and remove stream threads during
execution.



https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads


Best,
Bruno




Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-03 Thread Matthias J. Sax
Thanks! SGTM.

-Matthias

On 9/3/20 3:17 AM, Bruno Cadonna wrote:
> Hi Matthias,
> 
> I replied inline.
> 
> Best,
> Bruno
> 
> On 02.09.20 22:06, Matthias J. Sax wrote:
>> Thanks for updating the KIP.
>>
>> Why do you propose to return `boolean` from addStreamThread() if the
>> thread could not be started? As an alternative, we could also throw an
>> exception if the client is not in state RUNNING? -- I guess both are
>> valid options: just want to see what the pros/cons of each approach
>> would be?
>>
> 
> I prefer to return a boolean because it is nothing exceptional if a
> stream thread cannot be added due to an inappropriate state. State
> changes are expected in Streams. Furthermore, users should not be forced
> to control their program flow by catching exceptions. Let me give you
> some examples for returning a boolean and throwing an exception:
> 
> returning a boolean
> 
> while (!kafkaStreams.addStreamThread() &&
>    kafkaStreams.state() != State.NOT_RUNNING &&
>    kafkaStreams.state() != State.ERROR) {
> }
> 
> 
> throwing an exception
> 
> boolean added = false;
> while (!added &&
>    kafkaStreams.state() != State.NOT_RUNNING &&
>    kafkaStreams.state() != State.ERROR) {
> 
>     try {
>     kafkaStreams.addStreamThread();
> added = true;
>     } catch (final Exception ex) {
> // do nothing
>     }
> }
> 
> IMO the first example is more readable than the second.
> 
> 
>> Btw: should we allow to add a new thread if the state is REBALANCING,
>> too? I actually don't see a reason why we should not allow this?
>>
> 
> I guess you are right. I will update the KIP and include REBALANCING.
> 
> 
>> For removeStreamThread(), might it be worth to actually guarantee that
>> the thread with the largest index is stopped instead of leaving if
>> unspecified? It does not seem to be a big burden on the implementation
>> and given that we plan to reused indices of died threads, it might be
>> nice to have a contract? Or would there be any negative impact if we
>> guarantee it?
>>
> 
> I left unspecified which stream thread is removed since I could not find
> any good reason for a guarantee. Also in your comment, I do not see what
> advantage, we would have if we guaranteed that the stream thread with
> the largest index is stopped. It would not guarantee that the next added
> stream thread would get the largest index, because another stream thread
> with a lower index could have failed in the meanwhile and now two
> indices are up for grabs.
> Leaving unspecified which stream thread is removed also gives us the
> possibility to choose the stream thread to remove according to other
> aspects like for example the one with the least local state.
> 
> 
>> Another thought: should we add a parameter `numberOfThreads` to each
>> method to allow users to start/stop multiple threads at once?
>>
> 
> I would keep it simple for now and add overloads if users request them.
> 
> 
>> What happens if there is zero running threads and one calls
>> removeStreamThread()? Should we also add a `boolean` flag and return
>> `false` for this case (or throw an exception)?
>>
> 
> Yeah, I think this is a good idea for the programmatical removal of all
> threads. However, I would not throw an exception for the reasons I
> pointed out above.
> 
> 
>>
>> For the metric name, I would prefer "failed" over "crashed". Thoughts?
>>
> 
> I think I like "failed" more than "crashed" and it is also more
> consistent with other parts of the code like the
> ProductionExceptionHandlerResponse.FAIL.
> 
> 
>>
>> Side remark for the PR: can we make sure to update the description of
>> `num.stream.threads` to explain that it's the _initial_ number of
>> threads on startup?
>>
> 
> Good point!
> 
>>
>> -Matthias
>>
>>
>> On 9/1/20 2:01 PM, Walker Carlson wrote:
>>> Hi Bruno,
>>>
>>> I read through your updated KIP and it looks good to me. I agree with
>>> adding the metric to keep track of crashed streams in replace of a
>>> list of
>>> dead streams.
>>>
>>> best,
>>> Wlaker :)
>>>
>>> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna  wrote:
>>>
 Hi John,

 your proposal makes sense! I will update the KIP.

 Best,
 Bruno

 On 01.09.20 17:31, John Roesler wrote:
> Hello Bruno,
>
> Thanks for the update! The KIP looks good to me; I only have
> a grammatical complaint about the proposed metric name.
>
> "Died" is a verb, the past tense of "to die", but in the
> expression,"x stream threads", x should be an adjective. To
> be fair, "died" is also the past participle of "to die", and
> participles can usually be used as adjectives. Maybe it
> sounds wrong to me because there's already a specifically
> adjectival form: "dead". So "dead-stream-threads" seems more
> natural.
>
> However, I'm not sure if that captures the specific meaning
> you're shooting for, namely that the metric counts only the
> threads that died exceptionally, vs. 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-03 Thread Bruno Cadonna

Hi Matthias,

I replied inline.

Best,
Bruno

On 02.09.20 22:06, Matthias J. Sax wrote:

Thanks for updating the KIP.

Why do you propose to return `boolean` from addStreamThread() if the
thread could not be started? As an alternative, we could also throw an
exception if the client is not in state RUNNING? -- I guess both are
valid options: just want to see what the pros/cons of each approach
would be?



I prefer to return a boolean because it is nothing exceptional if a 
stream thread cannot be added due to an inappropriate state. State 
changes are expected in Streams. Furthermore, users should not be forced 
to control their program flow by catching exceptions. Let me give you 
some examples for returning a boolean and throwing an exception:


returning a boolean

while (!kafkaStreams.addStreamThread() &&
   kafkaStreams.state() != State.NOT_RUNNING &&
   kafkaStreams.state() != State.ERROR) {
}


throwing an exception

boolean added = false;
while (!added &&
   kafkaStreams.state() != State.NOT_RUNNING &&
   kafkaStreams.state() != State.ERROR) {

try {
kafkaStreams.addStreamThread();
added = true;
} catch (final Exception ex) {
// do nothing
}
}

IMO the first example is more readable than the second.



Btw: should we allow to add a new thread if the state is REBALANCING,
too? I actually don't see a reason why we should not allow this?



I guess you are right. I will update the KIP and include REBALANCING.



For removeStreamThread(), might it be worth to actually guarantee that
the thread with the largest index is stopped instead of leaving if
unspecified? It does not seem to be a big burden on the implementation
and given that we plan to reused indices of died threads, it might be
nice to have a contract? Or would there be any negative impact if we
guarantee it?



I left unspecified which stream thread is removed since I could not find 
any good reason for a guarantee. Also in your comment, I do not see what 
advantage, we would have if we guaranteed that the stream thread with 
the largest index is stopped. It would not guarantee that the next added 
stream thread would get the largest index, because another stream thread 
with a lower index could have failed in the meanwhile and now two 
indices are up for grabs.
Leaving unspecified which stream thread is removed also gives us the 
possibility to choose the stream thread to remove according to other 
aspects like for example the one with the least local state.




Another thought: should we add a parameter `numberOfThreads` to each
method to allow users to start/stop multiple threads at once?



I would keep it simple for now and add overloads if users request them.



What happens if there is zero running threads and one calls
removeStreamThread()? Should we also add a `boolean` flag and return
`false` for this case (or throw an exception)?



Yeah, I think this is a good idea for the programmatical removal of all 
threads. However, I would not throw an exception for the reasons I 
pointed out above.





For the metric name, I would prefer "failed" over "crashed". Thoughts?



I think I like "failed" more than "crashed" and it is also more 
consistent with other parts of the code like the 
ProductionExceptionHandlerResponse.FAIL.





Side remark for the PR: can we make sure to update the description of
`num.stream.threads` to explain that it's the _initial_ number of
threads on startup?



Good point!



-Matthias


On 9/1/20 2:01 PM, Walker Carlson wrote:

Hi Bruno,

I read through your updated KIP and it looks good to me. I agree with
adding the metric to keep track of crashed streams in replace of a list of
dead streams.

best,
Wlaker :)

On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna  wrote:


Hi John,

your proposal makes sense! I will update the KIP.

Best,
Bruno

On 01.09.20 17:31, John Roesler wrote:

Hello Bruno,

Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.

"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.

However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?

Thanks,
-John

On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:

Hi,

I updated the KIP with the feedback so far. I removed the API to close
the Kafka Streams client asynchronously, since it should be possible to
avoid the deadlock with the existing method and without a KIP.

Please have a look at the updated KIP and let me know what you think.



Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-02 Thread Matthias J. Sax
Thanks for updating the KIP.

Why do you propose to return `boolean` from addStreamThread() if the
thread could not be started? As an alternative, we could also throw an
exception if the client is not in state RUNNING? -- I guess both are
valid options: just want to see what the pros/cons of each approach
would be?

Btw: should we allow to add a new thread if the state is REBALANCING,
too? I actually don't see a reason why we should not allow this?

For removeStreamThread(), might it be worth to actually guarantee that
the thread with the largest index is stopped instead of leaving if
unspecified? It does not seem to be a big burden on the implementation
and given that we plan to reused indices of died threads, it might be
nice to have a contract? Or would there be any negative impact if we
guarantee it?

Another thought: should we add a parameter `numberOfThreads` to each
method to allow users to start/stop multiple threads at once?

What happens if there is zero running threads and one calls
removeStreamThread()? Should we also add a `boolean` flag and return
`false` for this case (or throw an exception)?


For the metric name, I would prefer "failed" over "crashed". Thoughts?


Side remark for the PR: can we make sure to update the description of
`num.stream.threads` to explain that it's the _initial_ number of
threads on startup?


-Matthias


On 9/1/20 2:01 PM, Walker Carlson wrote:
> Hi Bruno,
> 
> I read through your updated KIP and it looks good to me. I agree with
> adding the metric to keep track of crashed streams in replace of a list of
> dead streams.
> 
> best,
> Wlaker :)
> 
> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna  wrote:
> 
>> Hi John,
>>
>> your proposal makes sense! I will update the KIP.
>>
>> Best,
>> Bruno
>>
>> On 01.09.20 17:31, John Roesler wrote:
>>> Hello Bruno,
>>>
>>> Thanks for the update! The KIP looks good to me; I only have
>>> a grammatical complaint about the proposed metric name.
>>>
>>> "Died" is a verb, the past tense of "to die", but in the
>>> expression,"x stream threads", x should be an adjective. To
>>> be fair, "died" is also the past participle of "to die", and
>>> participles can usually be used as adjectives. Maybe it
>>> sounds wrong to me because there's already a specifically
>>> adjectival form: "dead". So "dead-stream-threads" seems more
>>> natural.
>>>
>>> However, I'm not sure if that captures the specific meaning
>>> you're shooting for, namely that the metric counts only the
>>> threads that died exceptionally, vs. from calling
>>> "removeStreamThread()". What do you think of "crashed-
>>> stream-threads" instead?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
 Hi,

 I updated the KIP with the feedback so far. I removed the API to close
 the Kafka Streams client asynchronously, since it should be possible to
 avoid the deadlock with the existing method and without a KIP.

 Please have a look at the updated KIP and let me know what you think.


>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

 Best,
 Bruno

 On 26.08.20 16:31, Bruno Cadonna wrote:
> Hi,
>
> I would like to propose the following KIP to start and shut down stream
> threads during execution as well as to shut down asynchronously a Kafka
> Streams client from an uncaught exception handler.
>
>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>
>
> Best,
> Bruno
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-01 Thread Walker Carlson
Hi Bruno,

I read through your updated KIP and it looks good to me. I agree with
adding the metric to keep track of crashed streams in replace of a list of
dead streams.

best,
Wlaker :)

On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna  wrote:

> Hi John,
>
> your proposal makes sense! I will update the KIP.
>
> Best,
> Bruno
>
> On 01.09.20 17:31, John Roesler wrote:
> > Hello Bruno,
> >
> > Thanks for the update! The KIP looks good to me; I only have
> > a grammatical complaint about the proposed metric name.
> >
> > "Died" is a verb, the past tense of "to die", but in the
> > expression,"x stream threads", x should be an adjective. To
> > be fair, "died" is also the past participle of "to die", and
> > participles can usually be used as adjectives. Maybe it
> > sounds wrong to me because there's already a specifically
> > adjectival form: "dead". So "dead-stream-threads" seems more
> > natural.
> >
> > However, I'm not sure if that captures the specific meaning
> > you're shooting for, namely that the metric counts only the
> > threads that died exceptionally, vs. from calling
> > "removeStreamThread()". What do you think of "crashed-
> > stream-threads" instead?
> >
> > Thanks,
> > -John
> >
> > On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
> >> Hi,
> >>
> >> I updated the KIP with the feedback so far. I removed the API to close
> >> the Kafka Streams client asynchronously, since it should be possible to
> >> avoid the deadlock with the existing method and without a KIP.
> >>
> >> Please have a look at the updated KIP and let me know what you think.
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> >>
> >> Best,
> >> Bruno
> >>
> >> On 26.08.20 16:31, Bruno Cadonna wrote:
> >>> Hi,
> >>>
> >>> I would like to propose the following KIP to start and shut down stream
> >>> threads during execution as well as to shut down asynchronously a Kafka
> >>> Streams client from an uncaught exception handler.
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
> >>>
> >>>
> >>> Best,
> >>> Bruno
> >
>


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-01 Thread Bruno Cadonna

Hi John,

your proposal makes sense! I will update the KIP.

Best,
Bruno

On 01.09.20 17:31, John Roesler wrote:

Hello Bruno,

Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.

"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.

However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?

Thanks,
-John

On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:

Hi,

I updated the KIP with the feedback so far. I removed the API to close
the Kafka Streams client asynchronously, since it should be possible to
avoid the deadlock with the existing method and without a KIP.

Please have a look at the updated KIP and let me know what you think.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

Best,
Bruno

On 26.08.20 16:31, Bruno Cadonna wrote:

Hi,

I would like to propose the following KIP to start and shut down stream
threads during execution as well as to shut down asynchronously a Kafka
Streams client from an uncaught exception handler.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients


Best,
Bruno




Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-01 Thread John Roesler
Hello Bruno,

Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.

"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.

However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?

Thanks,
-John

On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
> Hi,
> 
> I updated the KIP with the feedback so far. I removed the API to close 
> the Kafka Streams client asynchronously, since it should be possible to 
> avoid the deadlock with the existing method and without a KIP.
> 
> Please have a look at the updated KIP and let me know what you think.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> 
> Best,
> Bruno
> 
> On 26.08.20 16:31, Bruno Cadonna wrote:
> > Hi,
> > 
> > I would like to propose the following KIP to start and shut down stream 
> > threads during execution as well as to shut down asynchronously a Kafka 
> > Streams client from an uncaught exception handler.
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
> >  
> > 
> > 
> > Best,
> > Bruno



Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-01 Thread Bruno Cadonna

Hi,

I updated the KIP with the feedback so far. I removed the API to close 
the Kafka Streams client asynchronously, since it should be possible to 
avoid the deadlock with the existing method and without a KIP.


Please have a look at the updated KIP and let me know what you think.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

Best,
Bruno

On 26.08.20 16:31, Bruno Cadonna wrote:

Hi,

I would like to propose the following KIP to start and shut down stream 
threads during execution as well as to shut down asynchronously a Kafka 
Streams client from an uncaught exception handler.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients 



Best,
Bruno


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Thank you Matthias for the feedback.

Please find my answers inline.

Best,
Bruno

On 26.08.20 19:54, Matthias J. Sax wrote:

Thanks for the KIP Bruno.

While reading it, I had the same questions as raised by John and Walker
(so I won't repeat them).

In addition, I think that adding/removing threads should only be allowed
if the client state is RUNNING (but not in any other state, maybe except
ERROR). Furthermore, it seem that the methods must be `synchronized`
similar to `start()` and `close()`.



Good point about only adding stream threads in client state RUNNING. I 
will add that to the KIP.



IMO `synchronized` is a implementation detail that we should discuss on 
the PR.




While I understand that current `close(Duration.ZERO)` is not the same
as `requestClose()`, I am wondering if we should change the semantics of
`close()` instead of adding a new method though?



As I wrote in my previous e-mails, I will rethink the addition of method 
requestClose().




Btw: for thread naming, I personally think that just using a counter (as
we do right now) might be ok. If this becomes an issue, we could improve
it later.


-Matthias

On 8/26/20 9:46 AM, Walker Carlson wrote:

Hello Burno,

Thanks for the KIP!

Not to pile on, but I also had a couple additional questions. I am not
super familiar with the StreamThread internals so please forgive any
misconceptions if these are not relevant questions.

1. In requestClose if a thread does not close properly and deadlocks for
some reason how will we avoid the client waiting on the thread to close?
like when you try to close the Kafka Streams client from a thread
UncaughtExcetiopnHandler now.

The kip said it would improve the handling of these conditions, However I
did not find it clear what strategy this improvement  would use. Maybe
handling broken threads is out of the scope of this KIP or am I missing
something?

2a. Will the removal of Stream Threads in state DEAD be automatic? And will
it be for all in that state or just for those closed with
shutDownStreamThread?

2b. From the wording it seems that removing DEAD threads form the Kafka
Streams client will be a new feature of this kip. If that is the case is
the reasonable possibility that keeping the dead threads in metadata might
be useful? For example if a thread is continually erroring and restarting a
replacement

3. Maybe instead of addThread() we could use startNewThread()?
I agree with John that startStreamThread could easily be misinterpreted
i.e. as startStreamThreads.

Thanks,
Walker

On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:


Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.

4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Hi Wlaker (pun intended),

Thank you for your feedback! Please find my answers inline.

Best,
Bruno

On 26.08.20 18:46, Walker Carlson wrote:

Hello Burno,

Thanks for the KIP!

Not to pile on, but I also had a couple additional questions. I am not
super familiar with the StreamThread internals so please forgive any
misconceptions if these are not relevant questions.

1. In requestClose if a thread does not close properly and deadlocks for
some reason how will we avoid the client waiting on the thread to close?
like when you try to close the Kafka Streams client from a thread
UncaughtExcetiopnHandler now.

The kip said it would improve the handling of these conditions, However I
did not find it clear what strategy this improvement  would use. Maybe
handling broken threads is out of the scope of this KIP or am I missing
something?



The main goal of requestClose() is to avoid the deadlock when closing 
the Kafka Streams client from the uncaught exception handler of a stream 
thread. The case in which the close may encounter broken stream threads 
is orthogonal to this KIP, IMO.
But as stated in my previous e-mail, I will rethink the necessity of a 
method requestClose().



2a. Will the removal of Stream Threads in state DEAD be automatic? And will
it be for all in that state or just for those closed with
shutDownStreamThread?



Yes, it would be automatic and it would be for all. I will some words 
about that in the KIP.




2b. From the wording it seems that removing DEAD threads form the Kafka
Streams client will be a new feature of this kip. If that is the case is
the reasonable possibility that keeping the dead threads in metadata might
be useful? For example if a thread is continually erroring and restarting a
replacement



Yes, that is a good question. I do not know how reasonable it is to keep 
DEAD stream threads around. However, what we could do is introducing a 
metric in the KIP to keep track of the stream threads that have died. 
IMO, such a metric is better than keeping around DEAD stream thread. I 
will add the metric to the KIP.




3. Maybe instead of addThread() we could use startNewThread()?
I agree with John that startStreamThread could easily be misinterpreted
i.e. as startStreamThreads.



I will change the names in the KIP.


Thanks,
Walker

On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:


Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.

4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-27 Thread Bruno Cadonna

Hi John,

Thank you for your feedback!

Please find my answers inline.

Best,
Bruno

On 26.08.20 17:49, John Roesler wrote:

Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.



This makes sense to me. I will rename the methods.


2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.



You are right, `close(Duration.ZERO)` would avoid the deadlock. However, 
`close(Duration.ZERO)` does not guarantee you that all resources get 
closed.
Furthermore, it is error-prone to rely on users to use the correct 
overload. One possibility to avoid that users use the wrong overload is 
to check in `close()` if the calling thread is a stream thread and in 
that case to call `close(0)` instead of `close(Long.MAX_VALUE)`. Maybe, 
I could also find a solution for the resource issue. Let me think about 
it. In conclusion, I agree that a new method may not be needed.



3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.



We definitely cannot reuse the name of a crashed stream thread, 
immediately. What we could do is keep a list of previously used names 
(or secondhand names to take your hand metaphor up) that are free now 
and reuse them for new stream threads. If there are no second-hand names 
a counter is increased and a new name is created. The list of secondhand 
names would be bounded by the maximum number of stream threads that were 
running contemporaneously. I guess that would not be too complex to 
implement and would avoid the pathological case. IMO, it would be less 
annoying have a new stream thread's metrics monitored in a graph of a 
stream thread that previously crashed than to run into the pathological 
case that costs money.




4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all StreamThreads die or when the Global thread dies. We
have a proposal from Navinder (KIP-406) to allow the Global
thread to automatically come back to life after some errors,
but presumably others would still be fatal.

I guess your reasoning is that if the cause of the ERROR
state happens to be just from all the StreamThreads dying,
and if there's a way to replace them, then it should be
possible to recover from this ERROR state.

Maybe that makes sense, and we should just note that if the
Global thread is dead, it won't be possible to transition
out of ERROR state.



Here, I was torn between ERROR and RUNNING.
I guess the key question here is: What is the meaning of ERROR?
Is it a terminal state that signalizes a fatal error that cannot be 
recovered without a restart of the client? If yes, there should not be 
any transition from ERROR to any state. I 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-26 Thread Matthias J. Sax
Thanks for the KIP Bruno.

While reading it, I had the same questions as raised by John and Walker
(so I won't repeat them).

In addition, I think that adding/removing threads should only be allowed
if the client state is RUNNING (but not in any other state, maybe except
ERROR). Furthermore, it seem that the methods must be `synchronized`
similar to `start()` and `close()`.

While I understand that current `close(Duration.ZERO)` is not the same
as `requestClose()`, I am wondering if we should change the semantics of
`close()` instead of adding a new method though?


Btw: for thread naming, I personally think that just using a counter (as
we do right now) might be ok. If this becomes an issue, we could improve
it later.


-Matthias

On 8/26/20 9:46 AM, Walker Carlson wrote:
> Hello Burno,
> 
> Thanks for the KIP!
> 
> Not to pile on, but I also had a couple additional questions. I am not
> super familiar with the StreamThread internals so please forgive any
> misconceptions if these are not relevant questions.
> 
> 1. In requestClose if a thread does not close properly and deadlocks for
> some reason how will we avoid the client waiting on the thread to close?
> like when you try to close the Kafka Streams client from a thread
> UncaughtExcetiopnHandler now.
> 
> The kip said it would improve the handling of these conditions, However I
> did not find it clear what strategy this improvement  would use. Maybe
> handling broken threads is out of the scope of this KIP or am I missing
> something?
> 
> 2a. Will the removal of Stream Threads in state DEAD be automatic? And will
> it be for all in that state or just for those closed with
> shutDownStreamThread?
> 
> 2b. From the wording it seems that removing DEAD threads form the Kafka
> Streams client will be a new feature of this kip. If that is the case is
> the reasonable possibility that keeping the dead threads in metadata might
> be useful? For example if a thread is continually erroring and restarting a
> replacement
> 
> 3. Maybe instead of addThread() we could use startNewThread()?
> I agree with John that startStreamThread could easily be misinterpreted
> i.e. as startStreamThreads.
> 
> Thanks,
> Walker
> 
> On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:
> 
>> Hi Bruno,
>>
>> Thanks for the well motivated and throrough KIP!
>>
>> It's a good point that the record cache should be re-
>> distributed over the threads.
>>
>> Reading your KIP leads me to a few questions:
>>
>> 1. Start vs. Add
>>
>> Maybe this is paranoid, but I'm mildly concerned that users
>> who don't read the docs too carefully might think they need
>> to call "start thread" to start their application's threads
>> after calling `KafkaStreams.start` to start the app.
>> Definitely not sure about this, but I'm wondering if it
>> would be more clear to say `addThread` and
>> `remove/dropThread` to make it clear that we are adding to
>> or subtracting from the total number of threads, not just
>> starting and stopping threads that are already in the pool.
>>
>> 2. requestClose() vs. close(Duration.ZERO)
>>
>> It's a very good point about deadlocks. Can you explain why
>> we need a new method, though? The specified behavior seems
>> the same as `close(Duration.ZERO)`.
>>
>> 3. Thread Naming
>>
>> Maybe this point is silly, but how will newly added threads
>> be numbered? Will dead and hence removed threads' names be
>> reused? Or will there be a monotonic counter for the
>> lifetime of the instance?
>>
>> It seems relevant to mention this because it will affect
>> metrics and logs. I guess _maybe_ it would be nice for the
>> thread that replaces a crashed thread to take over its name,
>> but since the crashed thread still exists while the
>> UncaughtExceptionHandler is executing, its name wouldn't be
>> up for grabs in any case yet.
>>
>> On the other hand, it might be nicer for operators to be
>> able to distinguish the logs/metrics of the replacement
>> thread from the dead one, so not reusing thread names might
>> be better.
>>
>> On the other hand, not reusing thread names in a
>> "replacement" exception handler means that a crashy
>> application would report an unbounded number of thread ids
>> over its lifespan. This might be an issue for people using
>> popular metrics aggregation services that charge per unique
>> combination of metrics tags. Then again, maybe this is a
>> pathological case not worth considering.
>>
>> And yes, I realized I just implied that I have three hands.
>>
>> 4. ERROR State
>>
>> Can you elaborate why users explicitly stopping all threads
>> should put the application into ERROR state? It does seem
>> like it's not exactly "running" at that point, but it also
>> doesn't seem like an error.
>>
>> Right now, ERROR is a terminal state that indicates users
>> must discard the application instance and create a new one.
>> If there is still a possiblity that we'd need a terminally
>> corrupted state, it would probably be a mistake to add an
>> 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-26 Thread Walker Carlson
Hello Burno,

Thanks for the KIP!

Not to pile on, but I also had a couple additional questions. I am not
super familiar with the StreamThread internals so please forgive any
misconceptions if these are not relevant questions.

1. In requestClose if a thread does not close properly and deadlocks for
some reason how will we avoid the client waiting on the thread to close?
like when you try to close the Kafka Streams client from a thread
UncaughtExcetiopnHandler now.

The kip said it would improve the handling of these conditions, However I
did not find it clear what strategy this improvement  would use. Maybe
handling broken threads is out of the scope of this KIP or am I missing
something?

2a. Will the removal of Stream Threads in state DEAD be automatic? And will
it be for all in that state or just for those closed with
shutDownStreamThread?

2b. From the wording it seems that removing DEAD threads form the Kafka
Streams client will be a new feature of this kip. If that is the case is
the reasonable possibility that keeping the dead threads in metadata might
be useful? For example if a thread is continually erroring and restarting a
replacement

3. Maybe instead of addThread() we could use startNewThread()?
I agree with John that startStreamThread could easily be misinterpreted
i.e. as startStreamThreads.

Thanks,
Walker

On Wed, Aug 26, 2020 at 8:48 AM John Roesler  wrote:

> Hi Bruno,
>
> Thanks for the well motivated and throrough KIP!
>
> It's a good point that the record cache should be re-
> distributed over the threads.
>
> Reading your KIP leads me to a few questions:
>
> 1. Start vs. Add
>
> Maybe this is paranoid, but I'm mildly concerned that users
> who don't read the docs too carefully might think they need
> to call "start thread" to start their application's threads
> after calling `KafkaStreams.start` to start the app.
> Definitely not sure about this, but I'm wondering if it
> would be more clear to say `addThread` and
> `remove/dropThread` to make it clear that we are adding to
> or subtracting from the total number of threads, not just
> starting and stopping threads that are already in the pool.
>
> 2. requestClose() vs. close(Duration.ZERO)
>
> It's a very good point about deadlocks. Can you explain why
> we need a new method, though? The specified behavior seems
> the same as `close(Duration.ZERO)`.
>
> 3. Thread Naming
>
> Maybe this point is silly, but how will newly added threads
> be numbered? Will dead and hence removed threads' names be
> reused? Or will there be a monotonic counter for the
> lifetime of the instance?
>
> It seems relevant to mention this because it will affect
> metrics and logs. I guess _maybe_ it would be nice for the
> thread that replaces a crashed thread to take over its name,
> but since the crashed thread still exists while the
> UncaughtExceptionHandler is executing, its name wouldn't be
> up for grabs in any case yet.
>
> On the other hand, it might be nicer for operators to be
> able to distinguish the logs/metrics of the replacement
> thread from the dead one, so not reusing thread names might
> be better.
>
> On the other hand, not reusing thread names in a
> "replacement" exception handler means that a crashy
> application would report an unbounded number of thread ids
> over its lifespan. This might be an issue for people using
> popular metrics aggregation services that charge per unique
> combination of metrics tags. Then again, maybe this is a
> pathological case not worth considering.
>
> And yes, I realized I just implied that I have three hands.
>
> 4. ERROR State
>
> Can you elaborate why users explicitly stopping all threads
> should put the application into ERROR state? It does seem
> like it's not exactly "running" at that point, but it also
> doesn't seem like an error.
>
> Right now, ERROR is a terminal state that indicates users
> must discard the application instance and create a new one.
> If there is still a possiblity that we'd need a terminally
> corrupted state, it would probably be a mistake to add an
> out-transition from it.
>
> The documentation on that state says that it happens when
> all StreamThreads die or when the Global thread dies. We
> have a proposal from Navinder (KIP-406) to allow the Global
> thread to automatically come back to life after some errors,
> but presumably others would still be fatal.
>
> I guess your reasoning is that if the cause of the ERROR
> state happens to be just from all the StreamThreads dying,
> and if there's a way to replace them, then it should be
> possible to recover from this ERROR state.
>
> Maybe that makes sense, and we should just note that if the
> Global thread is dead, it won't be possible to transition
> out of ERROR state.
>
> 5. Global thread
>
> Should it be in the scope of this KIP to consider replacing
> the GlobalStreamThread?
>
> 6. Restarting
>
> It seems like, if I configure Streams to have eg. 1 thread
> and then add more threads over its lifetime (maybe up to 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-26 Thread John Roesler
Hi Bruno,

Thanks for the well motivated and throrough KIP!

It's a good point that the record cache should be re-
distributed over the threads.

Reading your KIP leads me to a few questions:

1. Start vs. Add

Maybe this is paranoid, but I'm mildly concerned that users
who don't read the docs too carefully might think they need
to call "start thread" to start their application's threads
after calling `KafkaStreams.start` to start the app.
Definitely not sure about this, but I'm wondering if it
would be more clear to say `addThread` and
`remove/dropThread` to make it clear that we are adding to
or subtracting from the total number of threads, not just
starting and stopping threads that are already in the pool.

2. requestClose() vs. close(Duration.ZERO)

It's a very good point about deadlocks. Can you explain why
we need a new method, though? The specified behavior seems
the same as `close(Duration.ZERO)`.

3. Thread Naming

Maybe this point is silly, but how will newly added threads
be numbered? Will dead and hence removed threads' names be
reused? Or will there be a monotonic counter for the
lifetime of the instance?

It seems relevant to mention this because it will affect
metrics and logs. I guess _maybe_ it would be nice for the
thread that replaces a crashed thread to take over its name,
but since the crashed thread still exists while the
UncaughtExceptionHandler is executing, its name wouldn't be
up for grabs in any case yet.

On the other hand, it might be nicer for operators to be
able to distinguish the logs/metrics of the replacement
thread from the dead one, so not reusing thread names might
be better.

On the other hand, not reusing thread names in a
"replacement" exception handler means that a crashy
application would report an unbounded number of thread ids
over its lifespan. This might be an issue for people using
popular metrics aggregation services that charge per unique
combination of metrics tags. Then again, maybe this is a
pathological case not worth considering.

And yes, I realized I just implied that I have three hands.

4. ERROR State

Can you elaborate why users explicitly stopping all threads
should put the application into ERROR state? It does seem
like it's not exactly "running" at that point, but it also
doesn't seem like an error.

Right now, ERROR is a terminal state that indicates users
must discard the application instance and create a new one.
If there is still a possiblity that we'd need a terminally
corrupted state, it would probably be a mistake to add an
out-transition from it.

The documentation on that state says that it happens when
all StreamThreads die or when the Global thread dies. We
have a proposal from Navinder (KIP-406) to allow the Global
thread to automatically come back to life after some errors,
but presumably others would still be fatal.

I guess your reasoning is that if the cause of the ERROR
state happens to be just from all the StreamThreads dying,
and if there's a way to replace them, then it should be
possible to recover from this ERROR state.

Maybe that makes sense, and we should just note that if the
Global thread is dead, it won't be possible to transition
out of ERROR state.

5. Global thread

Should it be in the scope of this KIP to consider replacing
the GlobalStreamThread?

6. Restarting

It seems like, if I configure Streams to have eg. 1 thread
and then add more threads over its lifetime (maybe up to 8),
then I might be dismayed after a restart of the app to see
it went back to 1.

I guess it could just be my job as a user to update the
config to match when I add or remove threads. Maybe that's
the best place to land right now. It still might be a good
point to mention in the KIP (and the docs).


Thanks again!
-John


On Wed, 2020-08-26 at 16:31 +0200, Bruno Cadonna wrote:
> Hi,
> 
> I would like to propose the following KIP to start and shut down stream 
> threads during execution as well as to shut down asynchronously a Kafka 
> Streams client from an uncaught exception handler.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>  
> 
> 
> Best,
> Bruno



[DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-08-26 Thread Bruno Cadonna

Hi,

I would like to propose the following KIP to start and shut down stream 
threads during execution as well as to shut down asynchronously a Kafka 
Streams client from an uncaught exception handler.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients 



Best,
Bruno