[
https://issues.apache.org/jira/browse/KAFKA-9611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17046798#comment-17046798
]
John Roesler commented on KAFKA-9611:
-------------------------------------
Hi [~neilgreen] ,
It seems like classifying this as a bug is going to be controversial. In any
case, I think you've made a good argument that it would be _better_ to apply
both updates "atomically" when possible. Can we instead classify this ticket as
an "improvement"?
I took a brief look at the code, and I think it may actually be possible to
apply this change without too much strife. The reason for the current behavior
is that in the general case, we may need to update two rows in the derived
table (as [~mjsax] said). Two different rows may be in two different
partitions, and therefore, we _must_ send two independent repartition messages.
Since the specific case where we only need to update one row (the derived key
hasn't changed) also "works" with the same logic, we just treat them the same.
The actual "splitting" takes place in
`org.apache.kafka.streams.kstream.internals.KTableRepartitionMap.KTableMapProcessor#process`,
where we do this:
{code:java}
final KeyValue<? extends K1, ? extends V1> newPair = change.newValue == null ?
null : mapper.apply(key, change.newValue);
final KeyValue<? extends K1, ? extends V1> oldPair = change.oldValue == null ?
null : mapper.apply(key, change.oldValue);
// if the selected repartition key or value is null, skip
// forward oldPair first, to be consistent with reduce and aggregate
if (oldPair != null && oldPair.key != null && oldPair.value != null) {
context().forward(oldPair.key, new Change<>(null, oldPair.value));
}
if (newPair != null && newPair.key != null && newPair.value != null) {
context().forward(newPair.key, new Change<>(newPair.value, null));
}
{code}
Then, we send it over the repartition topic using the ChangedSerializer (which
explicitly assumes that the Change has already been split into two updates):
{code:java}
// only one of the old / new values would be not null
if (data.newValue != null) {
if (data.oldValue != null) {
throw new StreamsException("Both old and new values are not null (" +
data.oldValue
+ " : " + data.newValue + ") in ChangeSerializer, which is not
allowed.");
}
serializedKey = inner.serialize(topic, headers, data.newValue);
} else {
if (data.oldValue == null) {
throw new StreamsException("Both old and new values are null in
ChangeSerializer, which is not allowed.");
}
serializedKey = inner.serialize(topic, headers, data.oldValue);
}
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
buf.put((byte) (data.newValue != null ? 1 : 0));
return buf.array();
{code}
Then, on the receiving end, we decode each message using the
ChangedDeserializer:
{code:java}
if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
return new Change<>(inner.deserialize(topic, headers, bytes), null);
} else {
return new Change<>(null, inner.deserialize(topic, headers, bytes));
} {code}
And then finally, both updates pass (independently) through the aggregator.
Note that the logic works whether the Change is split into two updates [(old,
null),(null,new)] or is a whole Change (old, new).
{code:java}
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
intermediateAgg = remove.apply(key, value.oldValue, oldAgg);
newTimestamp = Math.max(context().timestamp(),
oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
}
// then try to add the new value
final T newAgg;
if (value.newValue != null) {
final T initializedAgg;
if (intermediateAgg == null) {
initializedAgg = initializer.apply();
} else {
initializedAgg = intermediateAgg;
}
newAgg = add.apply(key, value.newValue, initializedAgg);
if (oldAggAndTimestamp != null) {
newTimestamp = Math.max(context().timestamp(),
oldAggAndTimestamp.timestamp());
}
} else {
newAgg = intermediateAgg;
}
// update the store with the new value
store.put(key, ValueAndTimestamp.make(newAgg, newTimestamp));
tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null,
newTimestamp); {code}
So, it seems like the improvement would be to update
`org.apache.kafka.streams.kstream.internals.KTableRepartitionMap.KTableMapProcessor#process`
to check whether the old and new key are actually the same, and in that case,
_not_ split the Change, then update the ChangedSerializer and Deserializer to
handle a third case in which both old and new values are non-null. Luckily, we
used a whole byte to represent the "is new" flag, so we could add a new value
of "2" to represent "both old and new are encoded", in which case, we would use
the logic of the FullChangeSerde to encode both values. The actual aggregation
logic would continue to work without modification, and you'd get the behavior
you prefer.
However, there's one fly in the ointment... During an upgrade to this new
logic, we may have a situation where the new code encodes and sends a
"combined" Change message, but the old code is still running on the receiving
end. In that case, when the recipient gets a message with a flag of "2", it
won't even notice; it would apply the _current_ deserializer logic of:
{code:java}
if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
return new Change<>(inner.deserialize(topic, headers, bytes), null);
} {code}
resulting in the loss of the "old value", so the old value wouldn't get
subtracted from the aggregation result.
This is the problem we have to solve in order to implement this improvement:
how to ensure that the recipient (aggregation) task gets the updated code
before the sending (repartition) task starts encoding with the new format.
> KGroupedTable.aggregate(...) emits incorrect values
> ---------------------------------------------------
>
> Key: KAFKA-9611
> URL: https://issues.apache.org/jira/browse/KAFKA-9611
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0
> Reporter: Neil Green
> Priority: Major
>
> I've run into what appears to be undesirable behaviour in a streams app.
> I have a KTable produced from a topic. The table contains entries like
> "abc1234/signal1" : 1, "abc1234/signal2" : 3
> The key is "id/signal name" and the value is an int. I want to produce a
> aggregate ktable containing the sum all of the
> signals for a given id.
> {{So if source ktable contains:}}
> {{+------------------+--+}}
> {{| abc1234/signal1 | 2 |}}
> {{| abc1234/signal2 | 4 |}}
> {{| abc4566/signal1 | 3 |}}
> {{+------------------+--+}}
> {{Then the output should contain}}
> {{+----------+--+}}
> {{| abc1234 | 6 |}}
> {{| abc4566 | 3 |}}
> {{+----------+--+}}
> {{On a change}}
> {{+------------------+--+}}
> {{| abc1234/signal1 | 3 |}}
> {{+------------------+--+}}
> {{```}}
> {{I would expect the change}}
> {{```}}
> {{+----------+--+}}
> {{| abc1234 | 7 |}}
> {{+----------+--+}}
> {{to be published.}}
> In fact there are two changelog entries published
> {{+----------+--+}}
> {{| abc1234 | 4 | // This is incorrect. The sum of the signals is never 4.}}
> {{+----------+--+}}
> Then
> {{+----------+--+}}
> {{| abc1234 | 7 |}}
> {{+----------+--+}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)