John Roesler created KAFKA-9503:
-----------------------------------
Summary: TopologyTestDriver processes intermediate results in the
wrong order
Key: KAFKA-9503
URL: https://issues.apache.org/jira/browse/KAFKA-9503
Project: Kafka
Issue Type: Bug
Reporter: John Roesler
Assignee: John Roesler
TopologyTestDriver has the feature that it processes each input synchronously,
resolving one of the most significant challenges with verifying the correctness
of streaming applications.
When processing an input, it feeds that record to the source node, which then
synchronously (it's always synchronous within a task) gets passed through the
subtopology via Context#forward calls. Ultimately, outputs from that input are
forwarded into the RecordCollector, which converts it to Producer.send calls.
In TopologyTestDriver, this Producer is a special one that actually just
captures the records.
Some output topics from one subtopology are inputs to another subtopology. For
example, repartition topics. Immediately after the synchronous subtopology
process() invocation, TopologyTestDriver iterates over the collected outputs
from the special Producer. If they are purely output records, it just enqueues
them for later retrieval by testing code. If they are records for internal
topics, though, TopologyTestDriver immediately processes them as inputs for
the relevant subtopology.
The problem, and this is very subtle, is that TopologyTestDriver does this
recursively, which with some (apparently rare) programs can cause the output to
be observed in an invalid order.
One such program is the one I wrote to test the fix for KAFKA-9487 . It
involves a foreign-key join whose result is joined back to one of its inputs.
Here's a simplified version:
// foreign key join
J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
// equi-join
OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))
Let's say we have the following initial condition:
A:
a1 = {v: X, b: b1}
B:
b1 = {v: Y}
J:
a1 = Pair({v: X}, b: b1}, {v: Y})
OUT:
a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Now, piping an update:
a1: {v: Z, b: b1}
results immediately in two buffered results in the Producer:
(FK join subscription): b1: {a1}
(OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Note that the FK join result isn't updated synchronously, since it's an async
operation, so the RHS lookup is temporarily incorrect, yielding the nonsense
intermediate result where the outer pair has the updated value for a1, but the
inner (fk result) one still has the old value for a1.
However! We don't buffer that output record for consumption by testing code
yet, we leave it in the internal Producer while we process the first
intermediate record (the FK subscription).
Processing that internal record means that we have a new internal record to
process:
(FK join subscription response): a1: {b1: {v: Y}}
so right now, our internal-records-to-process stack looks like:
(FK join subscription response): a1: {b1: {v: Y}}
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Again, we start by processing the first thing, the FK join response, which
results in an updated FK join result:
(J) a1: Pair({v: Z}, b: b1}, {v: Y})
and output:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
and, we still haven't handled the earlier output, so now our
internal-records-to-process stack looks like:
(J) a1: Pair({v: Z}, b: b1}, {v: Y})
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
At this point, there's nothing else to process in internal topics, so we just
copy the records one by one to the "output" collection for later handling by
testing code, but this yields the wrong final state of:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
That was an incorrect intermediate result, but because we're processing
internal records recursively (as a stack), it winds up emitted at the end
instead of in the middle.
If we change the processing model from a stack to a queue, the correct order is
preserved, and the final state is:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
This is what I did in https://github.com/apache/kafka/pull/8015
--
This message was sent by Atlassian Jira
(v8.3.4#803005)