Hi,

It seems that it wouldn't be that difficult to address: just don't
break Change(newVal, oldVal) into Change(newVal, null) /
Change(oldVal, null) and update aggregator value in one .process()
call.

Would this change make sense?
On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <matth...@confluent.io> wrote:
>
> Vasily,
>
> yes, it can happen. As you noticed, both messages might be processed on
> different machines. Thus, Kafka Streams provides 'eventual consistency'
> guarantees.
>
>
> -Matthias
>
> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> > Hi John,
> >
> > Thanks a lot for you explanation. It does make much more sense now.
> >
> > The Jira issue I think is pretty well explained (with a reference to
> > this thread). And I've lest my 2 cents in the pull request.
> >
> > You are right I didn't notice that repartition topic contains the same
> > message effectively twice, and 0/1 bytes are non-visible, so when I
> > used kafka-console-consumer I didn't notice that. So I have a quick
> > suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> > something that has visible corresponding ascii characters, say + and
> > -, as these messages are effectively commands to reducer to execute
> > either an addition or subtraction?
> >
> > On a more serious, side, can you please explain temporal aspects of
> > how change messages are handled? More specifically, is it guaranteed
> > that both Change(newValue, null) and Change(null, oldValue) are
> > handled before a new aggregated value is comitted to an output topic?
> > Change(newValue, null) and Change(null, oldValue) are delivered as two
> > separate messages via a kafka topic, and when they are read from a
> > topic (possibly on a different machine where a commit interval is
> > asynchronous to a machine that's put these changes into a topic) can
> > it happen so a Change(newValue, null) is processed by a
> > KTableReduceProcessor, the value of the aggregator is updated, and
> > committed to the changelog topic, and a Change(null, oldValue) is
> > processed only in the next commit interval? If I am understand this
> > correctly that would mean that in an aggregated table an incorrect
> > aggregated value will be observed briefly, before being eventually
> > corrected.
> >
> > Can that happen? Or I can't see something that would make it impossible?
> > On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote:
> >>
> >> Hi Vasily,
> >>
> >> I'm glad you're making me look at this; it's good homework for me!
> >>
> >> This is very non-obvious, but here's what happens:
> >>
> >> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it emits
> >> new/old Change pairs as the value.
> >>
> >> Next is the Select (aka GroupBy). In the DSL code, this is the
> >> KTableRepartitionMap (we call it a repartition when you select a new key,
> >> since the new keys may belong to different partitions).
> >> KTableRepartitionMap is a processor that does two things:
> >> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >> null)]
> >> In other words, it turns each Change event into two events: a retraction
> >> and an update
> >>
> >> Next comes the reduce operation. In building the processor node for this
> >> operation, we create the sink, repartition topic, and source, followed by
> >> the actual Reduce node. So if you want to look at how the changes get
> >> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >> You'll see that sink and source a ChangedSerializer and 
> >> ChangedDeserializer.
> >>
> >> By looking into those implementations, I found that they depend on each
> >> Change containing just one of new OR old. They serialize the underlying
> >> value using the serde you provide, along with a single byte that signifies
> >> if the serialized value is the new or old value, which the deserializer
> >> uses on the receiving end to turn it back into a Change(new, null) or
> >> Change(null, old) as appropriate. This is why the repartition topic looks
> >> like it's just the raw data. It basically is, except for the magic byte.
> >>
> >> Does that make sense?
> >>
> >> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> >> leaving any feedback you have?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov <vas...@sulatskov.net>
> >> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Thanks for your explanation.
> >>>
> >>> I have an answer to the practical question, i.e. a null aggregator
> >>> value should be interpreted as a fatal application error.
> >>>
> >>> On the other hand, looking at the app topology, I see that a message
> >>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> >>> KTABLE-SELECT-0000000006 which in turn forwards data to
> >>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> >>> this point I assume that data goes back to kafka into a *-repartition
> >>> topic, after that the message is read from kafka by
> >>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> >>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> >>> [aggregated-table]), where the actual aggregation takes place. What I
> >>> don't get is where this Change value comes from, I mean if it's been
> >>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> >>> message goes through kafka where it gets serialized, and looking at
> >>> kafka "repartition" topic, it contains regular values, not a pair of
> >>> old/new.
> >>>
> >>> As far as I understand, Change is a purely in-memory representation of
> >>> the state for a particular key, and at no point it's serialized back
> >>> to kafka, yet somehow this Change values makes it to reducer. I feel
> >>> like I am missing something here. Could you please clarify this?
> >>>
> >>> Can you please point me to a place in kafka-streams sources where a
> >>> Change of newValue/oldValue is produced, so I could take a look? I
> >>> found KTableReduce implementation, but can't find who makes these
> >>> Change value.
> >>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> wrote:
> >>>>
> >>>> Hi again Vasily,
> >>>>
> >>>> Ok, it looks to me like this behavior is the result of the un-clean
> >>>> topology change.
> >>>>
> >>>> Just in case you're interested, here's what I think happened.
> >>>>
> >>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / "table"
> >>> )
> >>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> >>>> forwarding both the old and new value that we are able to maintain
> >>>> aggregates using the subtractor/adder pairs)
> >>>>
> >>>> 2. In the full topology, these old/new pairs go through some
> >>>> transformations, but still in some form eventually make their way down to
> >>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >>>>
> >>>> 3. The reduce processor logic looks like this:
> >>>> final V oldAgg = store.get(key);
> >>>> V newAgg = oldAgg;
> >>>>
> >>>> // first try to add the new value
> >>>> if (value.newValue != null) {
> >>>>     if (newAgg == null) {
> >>>>         newAgg = value.newValue;
> >>>>     } else {
> >>>>         newAgg = addReducer.apply(newAgg, value.newValue);
> >>>>     }
> >>>> }
> >>>>
> >>>> // then try to remove the old value
> >>>> if (value.oldValue != null) {
> >>>>     // Here's where the assumption breaks down...
> >>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> >>>> }
> >>>>
> >>>> 4. Here's what I think happened. This processor saw an event like
> >>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first block,
> >>> and
> >>>> (since "oldValue != null") would go into the second block. I think that
> >>> in
> >>>> the normal case we can rely on the invariant that any value we get as an
> >>>> "oldValue" is one that we've previously seen ( as "newValue" ).
> >>>> Consequently, we should be able to assume that if we get a non-null
> >>>> "oldValue", "newAgg" will also not be null (because we would have written
> >>>> it to the store back when we saw it as "newValue" and then retrieved it
> >>>> just now as "newAgg = oldAgg").
> >>>>
> >>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
> >>>> and KSTREAM-SINK-0000000013 get added after (KSTREAM-REDUCE-0000000002 /
> >>>> "table") has already emitted some values, then we might in fact receive
> >>> an
> >>>> event with some "oldValue" that we have in fact never seen before
> >>> (because (
> >>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology when
> >>> it
> >>>> was first emitted as a "newValue").
> >>>>
> >>>> This would violate our assumption, and we would unintentionally send a
> >>>> "null" as the "newAgg" parameter to the "removeReducer" (aka subtractor).
> >>>> If you want to double-check my reasoning, you should be able to do so in
> >>>> the debugger with a breakpoint in KTableReduce.
> >>>>
> >>>>
> >>>> tl;dr: Supposing you reset the app when the topology changes, I think
> >>> that
> >>>> you should be able to rely on non-null aggregates being passed in to
> >>> *both*
> >>>> the adder and subtractor in a reduce.
> >>>>
> >>>> I would be in favor, as you suggested, of adding an explicit check and
> >>>> throwing an exception if the aggregate is ever null at those points. This
> >>>> would actually help us detect if the topology has changed unexpectedly
> >>> and
> >>>> shut down, hopefully before any damage is done. I'll send a PR and see
> >>> what
> >>>> everyone thinks.
> >>>>
> >>>> Does this all seem like it adds up to you?
> >>>> -John
> >>>>
> >>>>
> >>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov <vas...@sulatskov.net>
> >>>> wrote:
> >>>>
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks for your reply. I am not sure if this behavior I've observed is
> >>>>> a bug or not, as I've not been resetting my application properly. On
> >>>>> the other hand if the subtractor or adder in the reduce operation are
> >>>>> never supposed to be called with null aggregator value, perhaps it
> >>>>> would make sense to put a null check in the table reduce
> >>>>> implementation to detect an application entering an invalid state. A
> >>>>> bit like a check for topics having the same number of partitions when
> >>>>> doing a join.
> >>>>>
> >>>>> Here's some information about my tests. Hope that can be useful:
> >>>>>
> >>>>> Topology at start:
> >>>>>
> >>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >>>>>    Sub-topology: 0
> >>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>       --> KSTREAM-MAP-0000000001
> >>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000003
> >>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>
> >>>>>   Sub-topology: 1
> >>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>       --> KTABLE-TOSTREAM-0000000006
> >>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000007
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000006
> >>>>>
> >>>>> This topology just takes data from the source topic "slope" which
> >>>>> produces messages like this:
> >>>>>
> >>>>> key1
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key3
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key2
> >>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>> key3
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>> key1
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>> key2
> >>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>
> >>>>> Every second, there are 3 new messages arrive from "slope" topic for
> >>>>> keys key1, key2 and key3, with constantly increasing value.
> >>>>> Data is transformed so that the original key is also tracked in the
> >>>>> message value, grouped by key, and windowed with a custom window, and
> >>>>> reduced with a dummy reduce operation to make a KTable.
> >>>>> KTable is converted back to a stream, and sent to a topic (just for
> >>>>> debugging purposes).
> >>>>>
> >>>>> Here's the source (it's kafka-streams-scala for the most part). Also
> >>>>> please ignore silly classes, it's obviously a test:
> >>>>>
> >>>>>     val slopeTable = builder
> >>>>>       .stream[String, TimedValue]("slope")
> >>>>>       .map(
> >>>>>         (key, value) =>
> >>>>>           (
> >>>>>             StringWrapper(key),
> >>>>>             TimedValueWithKey(value = value.value, timestamp =
> >>>>> value.timestamp, key = key)
> >>>>>         )
> >>>>>       )
> >>>>>       .groupByKey
> >>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
> >>>>>
> >>>>>     slopeTable.toStream
> >>>>>       .to("slope-table")
> >>>>>
> >>>>> Topology after change without a proper reset:
> >>>>>
> >>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >>>>>    Sub-topology: 0
> >>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>       --> KSTREAM-MAP-0000000001
> >>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000003
> >>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>
> >>>>>   Sub-topology: 1
> >>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000007
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000013
> >>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>     Sink: KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition)
> >>>>>       <-- KTABLE-SELECT-0000000006
> >>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000012
> >>>>>
> >>>>>   Sub-topology: 2
> >>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
> >>>>> [aggregated-table-repartition])
> >>>>>       --> KTABLE-REDUCE-0000000009
> >>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >>>>>       --> KTABLE-TOSTREAM-0000000010
> >>>>>       <-- KSTREAM-SOURCE-0000000008
> >>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >>>>>       --> KSTREAM-SINK-0000000011
> >>>>>       <-- KTABLE-REDUCE-0000000009
> >>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >>>>>       <-- KTABLE-TOSTREAM-0000000010
> >>>>>
> >>>>> Here's the source of the sub-topology that does table aggregation:
> >>>>>
> >>>>>     slopeTable
> >>>>>       .groupBy(
> >>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> >>>>> key.window()), value)
> >>>>>       )
> >>>>>       .reduceMat(adder = (aggValue, newValue) => {
> >>>>>         log.info(s"Called ADD: newValue=$newValue aggValue=$aggValue")
> >>>>>         val agg = Option(aggValue)
> >>>>>         TimedValueWithKey(
> >>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >>>>>           timestamp =
> >>>>>
> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>> newValue.timestamp),
> >>>>>           key = "reduced"
> >>>>>         )
> >>>>>       }, subtractor = (aggValue, newValue) => {
> >>>>>         log.info(s"Called SUB: newValue=$newValue aggValue=$aggValue")
> >>>>>         val agg = Option(aggValue)
> >>>>>         TimedValueWithKey(
> >>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >>>>>           timestamp =
> >>>>>
> >>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>> newValue.timestamp),
> >>>>>           key = "reduced"
> >>>>>         )
> >>>>>       }, "aggregated-table")
> >>>>>       .toStream
> >>>>>       .to("slope-aggregated-table")
> >>>>>
> >>>>> I log all calls to adder and subtractor, so I am able to see what's
> >>>>> going on there, as well as I track the original keys of the aggregated
> >>>>> values and their timestamps, so it's relatively easy to see how the
> >>>>> data goes through this topology
> >>>>>
> >>>>> In order to reproduce this behavior I need to:
> >>>>> 1. Start a full topology (with table aggregation)
> >>>>> 2. Start without table aggregation (no app reset)
> >>>>> 3. Start with table aggregation (no app reset)
> >>>>>
> >>>>> Bellow is an interpretation of the adder/subtractor logs for a given
> >>>>> key/window in the chronological order
> >>>>>
> >>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> >>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> >>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> >>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> >>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> >>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >>>>>
> >>>>> And in the end the last value that's materialized for that window
> >>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in value
> >>>>> for a single key between some point in the middle of the window and at
> >>>>> the end of the window, times 3. As opposed to the expected value of
> >>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys being
> >>>>> aggregated).
> >>>>>
> >>>>> It's clear to me that I should do an application reset, but I also
> >>>>> would like to understand, should I expect adder/subtractor being
> >>>>> called with null aggValue, or is it a clear sign that something went
> >>>>> horribly wrong?
> >>>>>
> >>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io>
> >>> wrote:
> >>>>>>
> >>>>>> Hi Vasily,
> >>>>>>
> >>>>>> Thanks for the email.
> >>>>>>
> >>>>>> To answer your question: you should reset the application basically
> >>> any
> >>>>>> time you change the topology. Some transitions are safe, but others
> >>> will
> >>>>>> result in data loss or corruption. Rather than try to reason about
> >>> which
> >>>>> is
> >>>>>> which, it's much safer just to either reset the app or not change it
> >>> (if
> >>>>> it
> >>>>>> has important state).
> >>>>>>
> >>>>>> Beyond changes that you make to the topology, we spend a lot of
> >>> effort to
> >>>>>> try and make sure that different versions of Streams will produce the
> >>>>> same
> >>>>>> topology, so unless the release notes say otherwise, you should be
> >>> able
> >>>>> to
> >>>>>> upgrade without a reset.
> >>>>>>
> >>>>>>
> >>>>>> I can't say right now whether those wacky behaviors are bugs or the
> >>>>> result
> >>>>>> of changing the topology without a reset. Or if they are correct but
> >>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
> >>> free to
> >>>>>> open a Jira ticket if you think you have found a bug, especially if
> >>> you
> >>>>> can
> >>>>>> describe a repro. Knowing your topology before and after the change
> >>> would
> >>>>>> also be immensely helpful. You can print it with Topology.describe().
> >>>>>>
> >>>>>> Regardless, I'll make a note to take a look at the code tomorrow and
> >>> try
> >>>>> to
> >>>>>> decide if you should expect these behaviors with "clean" topology
> >>>>> changes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> >>> vas...@sulatskov.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I am doing some experiments with kafka-streams KGroupedTable
> >>>>>>> aggregation, and admittedly I am not wiping data properly on each
> >>>>>>> restart, partially because I also wonder what would happen if you
> >>>>>>> change a streams topology without doing a proper reset.
> >>>>>>>
> >>>>>>> I've noticed that from time to time, kafka-streams
> >>>>>>> KGroupedTable.reduce() can call subtractor function with null
> >>>>>>> aggregator value, and if you try to work around that, by
> >>> interpreting
> >>>>>>> null aggregator value as zero for numeric value you get incorrect
> >>>>>>> aggregation result.
> >>>>>>>
> >>>>>>> I do understand that the proper way of handling this is to do a
> >>> reset
> >>>>>>> on topology changes, but I'd like to understand if there's any
> >>>>>>> legitmate case when kafka-streams can call an adder or a
> >>> substractor
> >>>>>>> with null aggregator value, and should I plan for this, or should I
> >>>>>>> interpret this as an invalid state, and terminate the application,
> >>> and
> >>>>>>> do a proper reset?
> >>>>>>>
> >>>>>>> Also, I can't seem to find a guide which explains when application
> >>>>>>> reset is necessary. Intuitively it seems that it should be done
> >>> every
> >>>>>>> time a topology changes. Any other cases?
> >>>>>>>
> >>>>>>> I tried to debug where the null value comes from and it seems that
> >>>>>>> KTableReduce.process() is getting called with Change<V> value with
> >>>>>>> newValue == null, and some non-null oldValue. Which leads to and to
> >>>>>>> subtractor being called with null aggregator value. I wonder how
> >>> it is
> >>>>>>> possible to have an old value for a key without a new value (does
> >>> it
> >>>>>>> happen because of the auto commit interval?).
> >>>>>>>
> >>>>>>> I've also noticed that it's possible for an input value from a
> >>> topic
> >>>>>>> to bypass aggregation function entirely and be directly
> >>> transmitted to
> >>>>>>> the output in certain cases: oldAgg is null, newValue is not null
> >>> and
> >>>>>>> oldValue is null - in that case newValue will be transmitted
> >>> directly
> >>>>>>> to the output. I suppose it's the correct behaviour, but feels a
> >>> bit
> >>>>>>> weird nonetheless. And I've actually been able to observe this
> >>>>>>> behaviour in practice. I suppose it's also caused by this happening
> >>>>>>> right before a commit happens, and the message is sent to a
> >>> changelog
> >>>>>>> topic.
> >>>>>>>
> >>>>>>> Please can someone with more knowledge shed some light on these
> >>> issues?
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Vasily Sulatskov
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best regards,
> >>>>> Vasily Sulatskov
> >>>>>
> >>>
> >>>
> >>>
> >>> --
> >>> Best regards,
> >>> Vasily Sulatskov
> >>>
> >
> >
> >
>


-- 
Best regards,
Vasily Sulatskov

Reply via email to