Hi Vasily,

Thanks for the email.

To answer your question: you should reset the application basically any
time you change the topology. Some transitions are safe, but others will
result in data loss or corruption. Rather than try to reason about which is
which, it's much safer just to either reset the app or not change it (if it
has important state).

Beyond changes that you make to the topology, we spend a lot of effort to
try and make sure that different versions of Streams will produce the same
topology, so unless the release notes say otherwise, you should be able to
upgrade without a reset.


I can't say right now whether those wacky behaviors are bugs or the result
of changing the topology without a reset. Or if they are correct but
surprising behavior somehow. I'll look into it tomorrow. Do feel free to
open a Jira ticket if you think you have found a bug, especially if you can
describe a repro. Knowing your topology before and after the change would
also be immensely helpful. You can print it with Topology.describe().

Regardless, I'll make a note to take a look at the code tomorrow and try to
decide if you should expect these behaviors with "clean" topology changes.

Thanks,
-John

On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <vas...@sulatskov.net>
wrote:

> Hi,
>
> I am doing some experiments with kafka-streams KGroupedTable
> aggregation, and admittedly I am not wiping data properly on each
> restart, partially because I also wonder what would happen if you
> change a streams topology without doing a proper reset.
>
> I've noticed that from time to time, kafka-streams
> KGroupedTable.reduce() can call subtractor function with null
> aggregator value, and if you try to work around that, by interpreting
> null aggregator value as zero for numeric value you get incorrect
> aggregation result.
>
> I do understand that the proper way of handling this is to do a reset
> on topology changes, but I'd like to understand if there's any
> legitmate case when kafka-streams can call an adder or a substractor
> with null aggregator value, and should I plan for this, or should I
> interpret this as an invalid state, and terminate the application, and
> do a proper reset?
>
> Also, I can't seem to find a guide which explains when application
> reset is necessary. Intuitively it seems that it should be done every
> time a topology changes. Any other cases?
>
> I tried to debug where the null value comes from and it seems that
> KTableReduce.process() is getting called with Change<V> value with
> newValue == null, and some non-null oldValue. Which leads to and to
> subtractor being called with null aggregator value. I wonder how it is
> possible to have an old value for a key without a new value (does it
> happen because of the auto commit interval?).
>
> I've also noticed that it's possible for an input value from a topic
> to bypass aggregation function entirely and be directly transmitted to
> the output in certain cases: oldAgg is null, newValue is not null and
> oldValue is null - in that case newValue will be transmitted directly
> to the output. I suppose it's the correct behaviour, but feels a bit
> weird nonetheless. And I've actually been able to observe this
> behaviour in practice. I suppose it's also caused by this happening
> right before a commit happens, and the message is sent to a changelog
> topic.
>
> Please can someone with more knowledge shed some light on these issues?
>
> --
> Best regards,
> Vasily Sulatskov
>

Reply via email to