Thank you everyone for your explanations, that's been most enlightening.
On Wed, Jul 18, 2018 at 2:28 AM Matthias J. Sax <matth...@confluent.io> wrote:
>
> I see -- sorry for miss-understanding initially.
>
> I agree that it would be possible to detect. Feel free to file a Jira
> for this improvement and maybe pick it up by yourself :)
>
>
> -Matthias
>
> On 7/17/18 3:01 PM, Vasily Sulatskov wrote:
> > Hi,
> >
> > I do understand that in a general case it's not possible to guarantee
> > that newValue and oldValue parts of a Change message arrive to the
> > same partitions, and I guess that's not really in the plans, but if I
> > correctly understand how it works, it should be possible to detect if
> > both newValue and oldValue go to the same partition and keep them
> > together, thus improving kafka-streams consistency guarantees. Right?
> >
> > For example right now I have such a usecase that when I perform
> > groupBy on a table, my new keys are computed purely from old keys, and
> > not from the value. And handling of such cases (not a general case)
> > can be improved.
> > On Tue, Jul 17, 2018 at 1:48 AM Matthias J. Sax <matth...@confluent.io> 
> > wrote:
> >>
> >> It is not possible to use a single message, because both messages may go
> >> to different partitions and may be processed by different applications
> >> instances.
> >>
> >> Note, that the overall KTable state is sharded. Updating a single
> >> upstream shard, might required to update two different downstream shards.
> >>
> >>
> >> -Matthias
> >>
> >> On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> >>> Hi,
> >>>
> >>> It seems that it wouldn't be that difficult to address: just don't
> >>> break Change(newVal, oldVal) into Change(newVal, null) /
> >>> Change(oldVal, null) and update aggregator value in one .process()
> >>> call.
> >>>
> >>> Would this change make sense?
> >>> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax <matth...@confluent.io> 
> >>> wrote:
> >>>>
> >>>> Vasily,
> >>>>
> >>>> yes, it can happen. As you noticed, both messages might be processed on
> >>>> different machines. Thus, Kafka Streams provides 'eventual consistency'
> >>>> guarantees.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> >>>>> Hi John,
> >>>>>
> >>>>> Thanks a lot for you explanation. It does make much more sense now.
> >>>>>
> >>>>> The Jira issue I think is pretty well explained (with a reference to
> >>>>> this thread). And I've lest my 2 cents in the pull request.
> >>>>>
> >>>>> You are right I didn't notice that repartition topic contains the same
> >>>>> message effectively twice, and 0/1 bytes are non-visible, so when I
> >>>>> used kafka-console-consumer I didn't notice that. So I have a quick
> >>>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> >>>>> something that has visible corresponding ascii characters, say + and
> >>>>> -, as these messages are effectively commands to reducer to execute
> >>>>> either an addition or subtraction?
> >>>>>
> >>>>> On a more serious, side, can you please explain temporal aspects of
> >>>>> how change messages are handled? More specifically, is it guaranteed
> >>>>> that both Change(newValue, null) and Change(null, oldValue) are
> >>>>> handled before a new aggregated value is comitted to an output topic?
> >>>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
> >>>>> separate messages via a kafka topic, and when they are read from a
> >>>>> topic (possibly on a different machine where a commit interval is
> >>>>> asynchronous to a machine that's put these changes into a topic) can
> >>>>> it happen so a Change(newValue, null) is processed by a
> >>>>> KTableReduceProcessor, the value of the aggregator is updated, and
> >>>>> committed to the changelog topic, and a Change(null, oldValue) is
> >>>>> processed only in the next commit interval? If I am understand this
> >>>>> correctly that would mean that in an aggregated table an incorrect
> >>>>> aggregated value will be observed briefly, before being eventually
> >>>>> corrected.
> >>>>>
> >>>>> Can that happen? Or I can't see something that would make it impossible?
> >>>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler <j...@confluent.io> wrote:
> >>>>>>
> >>>>>> Hi Vasily,
> >>>>>>
> >>>>>> I'm glad you're making me look at this; it's good homework for me!
> >>>>>>
> >>>>>> This is very non-obvious, but here's what happens:
> >>>>>>
> >>>>>> KStreamsReduce is a Processor of (K, V) => (K, Change<V>) . I.e., it 
> >>>>>> emits
> >>>>>> new/old Change pairs as the value.
> >>>>>>
> >>>>>> Next is the Select (aka GroupBy). In the DSL code, this is the
> >>>>>> KTableRepartitionMap (we call it a repartition when you select a new 
> >>>>>> key,
> >>>>>> since the new keys may belong to different partitions).
> >>>>>> KTableRepartitionMap is a processor that does two things:
> >>>>>> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >>>>>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >>>>>> null)]
> >>>>>> In other words, it turns each Change event into two events: a 
> >>>>>> retraction
> >>>>>> and an update
> >>>>>>
> >>>>>> Next comes the reduce operation. In building the processor node for 
> >>>>>> this
> >>>>>> operation, we create the sink, repartition topic, and source, followed 
> >>>>>> by
> >>>>>> the actual Reduce node. So if you want to look at how the changes get
> >>>>>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >>>>>> You'll see that sink and source a ChangedSerializer and 
> >>>>>> ChangedDeserializer.
> >>>>>>
> >>>>>> By looking into those implementations, I found that they depend on each
> >>>>>> Change containing just one of new OR old. They serialize the underlying
> >>>>>> value using the serde you provide, along with a single byte that 
> >>>>>> signifies
> >>>>>> if the serialized value is the new or old value, which the deserializer
> >>>>>> uses on the receiving end to turn it back into a Change(new, null) or
> >>>>>> Change(null, old) as appropriate. This is why the repartition topic 
> >>>>>> looks
> >>>>>> like it's just the raw data. It basically is, except for the magic 
> >>>>>> byte.
> >>>>>>
> >>>>>> Does that make sense?
> >>>>>>
> >>>>>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >>>>>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look 
> >>>>>> and
> >>>>>> leaving any feedback you have?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov 
> >>>>>> <vas...@sulatskov.net>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi John,
> >>>>>>>
> >>>>>>> Thanks for your explanation.
> >>>>>>>
> >>>>>>> I have an answer to the practical question, i.e. a null aggregator
> >>>>>>> value should be interpreted as a fatal application error.
> >>>>>>>
> >>>>>>> On the other hand, looking at the app topology, I see that a message
> >>>>>>> from KSTREAM-REDUCE-0000000002 / "table" goes goes to
> >>>>>>> KTABLE-SELECT-0000000006 which in turn forwards data to
> >>>>>>> KSTREAM-SINK-0000000007 (topic: aggregated-table-repartition), and at
> >>>>>>> this point I assume that data goes back to kafka into a *-repartition
> >>>>>>> topic, after that the message is read from kafka by
> >>>>>>> KSTREAM-SOURCE-0000000008 (topics: [aggregated-table-repartition]),
> >>>>>>> and finally gets to Processor: KTABLE-REDUCE-0000000009 (stores:
> >>>>>>> [aggregated-table]), where the actual aggregation takes place. What I
> >>>>>>> don't get is where this Change value comes from, I mean if it's been
> >>>>>>> produced by KSTREAM-REDUCE-0000000002, but it shouldn't matter as the
> >>>>>>> message goes through kafka where it gets serialized, and looking at
> >>>>>>> kafka "repartition" topic, it contains regular values, not a pair of
> >>>>>>> old/new.
> >>>>>>>
> >>>>>>> As far as I understand, Change is a purely in-memory representation of
> >>>>>>> the state for a particular key, and at no point it's serialized back
> >>>>>>> to kafka, yet somehow this Change values makes it to reducer. I feel
> >>>>>>> like I am missing something here. Could you please clarify this?
> >>>>>>>
> >>>>>>> Can you please point me to a place in kafka-streams sources where a
> >>>>>>> Change of newValue/oldValue is produced, so I could take a look? I
> >>>>>>> found KTableReduce implementation, but can't find who makes these
> >>>>>>> Change value.
> >>>>>>> On Fri, Jul 13, 2018 at 6:17 PM John Roesler <j...@confluent.io> 
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi again Vasily,
> >>>>>>>>
> >>>>>>>> Ok, it looks to me like this behavior is the result of the un-clean
> >>>>>>>> topology change.
> >>>>>>>>
> >>>>>>>> Just in case you're interested, here's what I think happened.
> >>>>>>>>
> >>>>>>>> 1. Your reduce node in subtopology1 (KSTREAM-REDUCE-0000000002 / 
> >>>>>>>> "table"
> >>>>>>> )
> >>>>>>>> internally emits pairs of "oldValue"/"newValue" . (side-note: It's by
> >>>>>>>> forwarding both the old and new value that we are able to maintain
> >>>>>>>> aggregates using the subtractor/adder pairs)
> >>>>>>>>
> >>>>>>>> 2. In the full topology, these old/new pairs go through some
> >>>>>>>> transformations, but still in some form eventually make their way 
> >>>>>>>> down to
> >>>>>>>> the reduce node (KTABLE-REDUCE-0000000009/"aggregated-table").
> >>>>>>>>
> >>>>>>>> 3. The reduce processor logic looks like this:
> >>>>>>>> final V oldAgg = store.get(key);
> >>>>>>>> V newAgg = oldAgg;
> >>>>>>>>
> >>>>>>>> // first try to add the new value
> >>>>>>>> if (value.newValue != null) {
> >>>>>>>>     if (newAgg == null) {
> >>>>>>>>         newAgg = value.newValue;
> >>>>>>>>     } else {
> >>>>>>>>         newAgg = addReducer.apply(newAgg, value.newValue);
> >>>>>>>>     }
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> // then try to remove the old value
> >>>>>>>> if (value.oldValue != null) {
> >>>>>>>>     // Here's where the assumption breaks down...
> >>>>>>>>     newAgg = removeReducer.apply(newAgg, value.oldValue);
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> 4. Here's what I think happened. This processor saw an event like
> >>>>>>>> {new=null, old=(key2, 732, 10:50:40)}. This would skip the first 
> >>>>>>>> block,
> >>>>>>> and
> >>>>>>>> (since "oldValue != null") would go into the second block. I think 
> >>>>>>>> that
> >>>>>>> in
> >>>>>>>> the normal case we can rely on the invariant that any value we get 
> >>>>>>>> as an
> >>>>>>>> "oldValue" is one that we've previously seen ( as "newValue" ).
> >>>>>>>> Consequently, we should be able to assume that if we get a non-null
> >>>>>>>> "oldValue", "newAgg" will also not be null (because we would have 
> >>>>>>>> written
> >>>>>>>> it to the store back when we saw it as "newValue" and then retrieved 
> >>>>>>>> it
> >>>>>>>> just now as "newAgg = oldAgg").
> >>>>>>>>
> >>>>>>>> However, if subtopology2, along with KTABLE-SELECT-0000000006
> >>>>>>>> and KSTREAM-SINK-0000000013 get added after 
> >>>>>>>> (KSTREAM-REDUCE-0000000002 /
> >>>>>>>> "table") has already emitted some values, then we might in fact 
> >>>>>>>> receive
> >>>>>>> an
> >>>>>>>> event with some "oldValue" that we have in fact never seen before
> >>>>>>> (because (
> >>>>>>>> KTABLE-REDUCE-0000000009/"aggregated-table") wasn't in the topology 
> >>>>>>>> when
> >>>>>>> it
> >>>>>>>> was first emitted as a "newValue").
> >>>>>>>>
> >>>>>>>> This would violate our assumption, and we would unintentionally send 
> >>>>>>>> a
> >>>>>>>> "null" as the "newAgg" parameter to the "removeReducer" (aka 
> >>>>>>>> subtractor).
> >>>>>>>> If you want to double-check my reasoning, you should be able to do 
> >>>>>>>> so in
> >>>>>>>> the debugger with a breakpoint in KTableReduce.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> tl;dr: Supposing you reset the app when the topology changes, I think
> >>>>>>> that
> >>>>>>>> you should be able to rely on non-null aggregates being passed in to
> >>>>>>> *both*
> >>>>>>>> the adder and subtractor in a reduce.
> >>>>>>>>
> >>>>>>>> I would be in favor, as you suggested, of adding an explicit check 
> >>>>>>>> and
> >>>>>>>> throwing an exception if the aggregate is ever null at those points. 
> >>>>>>>> This
> >>>>>>>> would actually help us detect if the topology has changed 
> >>>>>>>> unexpectedly
> >>>>>>> and
> >>>>>>>> shut down, hopefully before any damage is done. I'll send a PR and 
> >>>>>>>> see
> >>>>>>> what
> >>>>>>>> everyone thinks.
> >>>>>>>>
> >>>>>>>> Does this all seem like it adds up to you?
> >>>>>>>> -John
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Jul 13, 2018 at 4:06 AM Vasily Sulatskov 
> >>>>>>>> <vas...@sulatskov.net>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi John,
> >>>>>>>>>
> >>>>>>>>> Thanks for your reply. I am not sure if this behavior I've observed 
> >>>>>>>>> is
> >>>>>>>>> a bug or not, as I've not been resetting my application properly. On
> >>>>>>>>> the other hand if the subtractor or adder in the reduce operation 
> >>>>>>>>> are
> >>>>>>>>> never supposed to be called with null aggregator value, perhaps it
> >>>>>>>>> would make sense to put a null check in the table reduce
> >>>>>>>>> implementation to detect an application entering an invalid state. A
> >>>>>>>>> bit like a check for topics having the same number of partitions 
> >>>>>>>>> when
> >>>>>>>>> doing a join.
> >>>>>>>>>
> >>>>>>>>> Here's some information about my tests. Hope that can be useful:
> >>>>>>>>>
> >>>>>>>>> Topology at start:
> >>>>>>>>>
> >>>>>>>>> 2018-07-13 10:29:48 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>>>    Sub-topology: 0
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 1
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>>>       --> KTABLE-TOSTREAM-0000000006
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: slope-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000006
> >>>>>>>>>
> >>>>>>>>> This topology just takes data from the source topic "slope" which
> >>>>>>>>> produces messages like this:
> >>>>>>>>>
> >>>>>>>>> key1
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key3
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key2
> >>>>>>>>> {"value":187,"timestamp":"2018-07-13T10:28:14.188+02:00[Europe/Paris]"}
> >>>>>>>>> key3
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>> key1
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>> key2
> >>>>>>>>> {"value":188,"timestamp":"2018-07-13T10:28:15.187+02:00[Europe/Paris]"}
> >>>>>>>>>
> >>>>>>>>> Every second, there are 3 new messages arrive from "slope" topic for
> >>>>>>>>> keys key1, key2 and key3, with constantly increasing value.
> >>>>>>>>> Data is transformed so that the original key is also tracked in the
> >>>>>>>>> message value, grouped by key, and windowed with a custom window, 
> >>>>>>>>> and
> >>>>>>>>> reduced with a dummy reduce operation to make a KTable.
> >>>>>>>>> KTable is converted back to a stream, and sent to a topic (just for
> >>>>>>>>> debugging purposes).
> >>>>>>>>>
> >>>>>>>>> Here's the source (it's kafka-streams-scala for the most part). Also
> >>>>>>>>> please ignore silly classes, it's obviously a test:
> >>>>>>>>>
> >>>>>>>>>     val slopeTable = builder
> >>>>>>>>>       .stream[String, TimedValue]("slope")
> >>>>>>>>>       .map(
> >>>>>>>>>         (key, value) =>
> >>>>>>>>>           (
> >>>>>>>>>             StringWrapper(key),
> >>>>>>>>>             TimedValueWithKey(value = value.value, timestamp =
> >>>>>>>>> value.timestamp, key = key)
> >>>>>>>>>         )
> >>>>>>>>>       )
> >>>>>>>>>       .groupByKey
> >>>>>>>>>       .windowedBy(new RoundedWindows(ChronoUnit.MINUTES, 1))
> >>>>>>>>>       .reduceMat((aggValue, newValue) => newValue, "table")
> >>>>>>>>>
> >>>>>>>>>     slopeTable.toStream
> >>>>>>>>>       .to("slope-table")
> >>>>>>>>>
> >>>>>>>>> Topology after change without a proper reset:
> >>>>>>>>>
> >>>>>>>>> 2018-07-13 10:38:32 [main] INFO  TableAggregationTest - Topologies:
> >>>>>>>>>    Sub-topology: 0
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000000 (topics: [slope])
> >>>>>>>>>       --> KSTREAM-MAP-0000000001
> >>>>>>>>>     Processor: KSTREAM-MAP-0000000001 (stores: [])
> >>>>>>>>>       --> KSTREAM-FILTER-0000000004
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000000
> >>>>>>>>>     Processor: KSTREAM-FILTER-0000000004 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000003
> >>>>>>>>>       <-- KSTREAM-MAP-0000000001
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000003 (topic: table-repartition)
> >>>>>>>>>       <-- KSTREAM-FILTER-0000000004
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 1
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000005 (topics: [table-repartition])
> >>>>>>>>>       --> KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KSTREAM-REDUCE-0000000002 (stores: [table])
> >>>>>>>>>       --> KTABLE-SELECT-0000000006, KTABLE-TOSTREAM-0000000012
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000005
> >>>>>>>>>     Processor: KTABLE-SELECT-0000000006 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000007
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000012 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000013
> >>>>>>>>>       <-- KSTREAM-REDUCE-0000000002
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000007 (topic: 
> >>>>>>>>> aggregated-table-repartition)
> >>>>>>>>>       <-- KTABLE-SELECT-0000000006
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000013 (topic: slope-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000012
> >>>>>>>>>
> >>>>>>>>>   Sub-topology: 2
> >>>>>>>>>     Source: KSTREAM-SOURCE-0000000008 (topics:
> >>>>>>>>> [aggregated-table-repartition])
> >>>>>>>>>       --> KTABLE-REDUCE-0000000009
> >>>>>>>>>     Processor: KTABLE-REDUCE-0000000009 (stores: [aggregated-table])
> >>>>>>>>>       --> KTABLE-TOSTREAM-0000000010
> >>>>>>>>>       <-- KSTREAM-SOURCE-0000000008
> >>>>>>>>>     Processor: KTABLE-TOSTREAM-0000000010 (stores: [])
> >>>>>>>>>       --> KSTREAM-SINK-0000000011
> >>>>>>>>>       <-- KTABLE-REDUCE-0000000009
> >>>>>>>>>     Sink: KSTREAM-SINK-0000000011 (topic: slope-aggregated-table)
> >>>>>>>>>       <-- KTABLE-TOSTREAM-0000000010
> >>>>>>>>>
> >>>>>>>>> Here's the source of the sub-topology that does table aggregation:
> >>>>>>>>>
> >>>>>>>>>     slopeTable
> >>>>>>>>>       .groupBy(
> >>>>>>>>>         (key, value) => (new Windowed(StringWrapper("dummykey"),
> >>>>>>>>> key.window()), value)
> >>>>>>>>>       )
> >>>>>>>>>       .reduceMat(adder = (aggValue, newValue) => {
> >>>>>>>>>         log.info(s"Called ADD: newValue=$newValue 
> >>>>>>>>> aggValue=$aggValue")
> >>>>>>>>>         val agg = Option(aggValue)
> >>>>>>>>>         TimedValueWithKey(
> >>>>>>>>>           value = agg.map(_.value).getOrElse(0) + newValue.value,
> >>>>>>>>>           timestamp =
> >>>>>>>>>
> >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>>>> newValue.timestamp),
> >>>>>>>>>           key = "reduced"
> >>>>>>>>>         )
> >>>>>>>>>       }, subtractor = (aggValue, newValue) => {
> >>>>>>>>>         log.info(s"Called SUB: newValue=$newValue 
> >>>>>>>>> aggValue=$aggValue")
> >>>>>>>>>         val agg = Option(aggValue)
> >>>>>>>>>         TimedValueWithKey(
> >>>>>>>>>           value = agg.map(_.value).getOrElse(0) - newValue.value,
> >>>>>>>>>           timestamp =
> >>>>>>>>>
> >>>>>>>>> Utils.latestTimestamp(agg.map(_.timestamp).getOrElse(zeroTimestamp),
> >>>>>>>>> newValue.timestamp),
> >>>>>>>>>           key = "reduced"
> >>>>>>>>>         )
> >>>>>>>>>       }, "aggregated-table")
> >>>>>>>>>       .toStream
> >>>>>>>>>       .to("slope-aggregated-table")
> >>>>>>>>>
> >>>>>>>>> I log all calls to adder and subtractor, so I am able to see what's
> >>>>>>>>> going on there, as well as I track the original keys of the 
> >>>>>>>>> aggregated
> >>>>>>>>> values and their timestamps, so it's relatively easy to see how the
> >>>>>>>>> data goes through this topology
> >>>>>>>>>
> >>>>>>>>> In order to reproduce this behavior I need to:
> >>>>>>>>> 1. Start a full topology (with table aggregation)
> >>>>>>>>> 2. Start without table aggregation (no app reset)
> >>>>>>>>> 3. Start with table aggregation (no app reset)
> >>>>>>>>>
> >>>>>>>>> Bellow is an interpretation of the adder/subtractor logs for a given
> >>>>>>>>> key/window in the chronological order
> >>>>>>>>>
> >>>>>>>>> SUB: newValue=(key2, 732, 10:50:40) aggValue=null
> >>>>>>>>> ADD: newValue=(key2, 751, 10:50:59) aggValue=(-732, 10:50:40)
> >>>>>>>>> SUB: newValue=(key1, 732, 10:50:40) aggValue=(19, 10:50:59)
> >>>>>>>>> ADD: newValue=(key1, 751, 10:50:59) aggValue=(-713, 10:50:59)
> >>>>>>>>> SUB: newValue=(key3, 732, 10:50:40) aggValue=(38, 10:50:59)
> >>>>>>>>> ADD: newValue=(key3, 751, 10:50:59) aggValue=(-694, 10:50:59)
> >>>>>>>>>
> >>>>>>>>> And in the end the last value that's materialized for that window
> >>>>>>>>> (i.e. windowed key) in the kafka topic is 57, i.e. a increase in 
> >>>>>>>>> value
> >>>>>>>>> for a single key between some point in the middle of the window and 
> >>>>>>>>> at
> >>>>>>>>> the end of the window, times 3. As opposed to the expected value of
> >>>>>>>>> 751 * 3 = 2253 (sum of last values in a time window for all keys 
> >>>>>>>>> being
> >>>>>>>>> aggregated).
> >>>>>>>>>
> >>>>>>>>> It's clear to me that I should do an application reset, but I also
> >>>>>>>>> would like to understand, should I expect adder/subtractor being
> >>>>>>>>> called with null aggValue, or is it a clear sign that something went
> >>>>>>>>> horribly wrong?
> >>>>>>>>>
> >>>>>>>>> On Fri, Jul 13, 2018 at 12:19 AM John Roesler <j...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi Vasily,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the email.
> >>>>>>>>>>
> >>>>>>>>>> To answer your question: you should reset the application basically
> >>>>>>> any
> >>>>>>>>>> time you change the topology. Some transitions are safe, but others
> >>>>>>> will
> >>>>>>>>>> result in data loss or corruption. Rather than try to reason about
> >>>>>>> which
> >>>>>>>>> is
> >>>>>>>>>> which, it's much safer just to either reset the app or not change 
> >>>>>>>>>> it
> >>>>>>> (if
> >>>>>>>>> it
> >>>>>>>>>> has important state).
> >>>>>>>>>>
> >>>>>>>>>> Beyond changes that you make to the topology, we spend a lot of
> >>>>>>> effort to
> >>>>>>>>>> try and make sure that different versions of Streams will produce 
> >>>>>>>>>> the
> >>>>>>>>> same
> >>>>>>>>>> topology, so unless the release notes say otherwise, you should be
> >>>>>>> able
> >>>>>>>>> to
> >>>>>>>>>> upgrade without a reset.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I can't say right now whether those wacky behaviors are bugs or the
> >>>>>>>>> result
> >>>>>>>>>> of changing the topology without a reset. Or if they are correct 
> >>>>>>>>>> but
> >>>>>>>>>> surprising behavior somehow. I'll look into it tomorrow. Do feel
> >>>>>>> free to
> >>>>>>>>>> open a Jira ticket if you think you have found a bug, especially if
> >>>>>>> you
> >>>>>>>>> can
> >>>>>>>>>> describe a repro. Knowing your topology before and after the change
> >>>>>>> would
> >>>>>>>>>> also be immensely helpful. You can print it with 
> >>>>>>>>>> Topology.describe().
> >>>>>>>>>>
> >>>>>>>>>> Regardless, I'll make a note to take a look at the code tomorrow 
> >>>>>>>>>> and
> >>>>>>> try
> >>>>>>>>> to
> >>>>>>>>>> decide if you should expect these behaviors with "clean" topology
> >>>>>>>>> changes.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> -John
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jul 12, 2018 at 11:51 AM Vasily Sulatskov <
> >>>>>>> vas...@sulatskov.net>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I am doing some experiments with kafka-streams KGroupedTable
> >>>>>>>>>>> aggregation, and admittedly I am not wiping data properly on each
> >>>>>>>>>>> restart, partially because I also wonder what would happen if you
> >>>>>>>>>>> change a streams topology without doing a proper reset.
> >>>>>>>>>>>
> >>>>>>>>>>> I've noticed that from time to time, kafka-streams
> >>>>>>>>>>> KGroupedTable.reduce() can call subtractor function with null
> >>>>>>>>>>> aggregator value, and if you try to work around that, by
> >>>>>>> interpreting
> >>>>>>>>>>> null aggregator value as zero for numeric value you get incorrect
> >>>>>>>>>>> aggregation result.
> >>>>>>>>>>>
> >>>>>>>>>>> I do understand that the proper way of handling this is to do a
> >>>>>>> reset
> >>>>>>>>>>> on topology changes, but I'd like to understand if there's any
> >>>>>>>>>>> legitmate case when kafka-streams can call an adder or a
> >>>>>>> substractor
> >>>>>>>>>>> with null aggregator value, and should I plan for this, or should 
> >>>>>>>>>>> I
> >>>>>>>>>>> interpret this as an invalid state, and terminate the application,
> >>>>>>> and
> >>>>>>>>>>> do a proper reset?
> >>>>>>>>>>>
> >>>>>>>>>>> Also, I can't seem to find a guide which explains when application
> >>>>>>>>>>> reset is necessary. Intuitively it seems that it should be done
> >>>>>>> every
> >>>>>>>>>>> time a topology changes. Any other cases?
> >>>>>>>>>>>
> >>>>>>>>>>> I tried to debug where the null value comes from and it seems that
> >>>>>>>>>>> KTableReduce.process() is getting called with Change<V> value with
> >>>>>>>>>>> newValue == null, and some non-null oldValue. Which leads to and 
> >>>>>>>>>>> to
> >>>>>>>>>>> subtractor being called with null aggregator value. I wonder how
> >>>>>>> it is
> >>>>>>>>>>> possible to have an old value for a key without a new value (does
> >>>>>>> it
> >>>>>>>>>>> happen because of the auto commit interval?).
> >>>>>>>>>>>
> >>>>>>>>>>> I've also noticed that it's possible for an input value from a
> >>>>>>> topic
> >>>>>>>>>>> to bypass aggregation function entirely and be directly
> >>>>>>> transmitted to
> >>>>>>>>>>> the output in certain cases: oldAgg is null, newValue is not null
> >>>>>>> and
> >>>>>>>>>>> oldValue is null - in that case newValue will be transmitted
> >>>>>>> directly
> >>>>>>>>>>> to the output. I suppose it's the correct behaviour, but feels a
> >>>>>>> bit
> >>>>>>>>>>> weird nonetheless. And I've actually been able to observe this
> >>>>>>>>>>> behaviour in practice. I suppose it's also caused by this 
> >>>>>>>>>>> happening
> >>>>>>>>>>> right before a commit happens, and the message is sent to a
> >>>>>>> changelog
> >>>>>>>>>>> topic.
> >>>>>>>>>>>
> >>>>>>>>>>> Please can someone with more knowledge shed some light on these
> >>>>>>> issues?
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Best regards,
> >>>>>>>>>>> Vasily Sulatskov
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Best regards,
> >>>>>>>>> Vasily Sulatskov
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Vasily Sulatskov
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
> >
>


--
Best regards,
Vasily Sulatskov

Reply via email to