[ https://issues.apache.org/jira/browse/KAFKA-10405 ]
Liam Clarke-Hutchinson deleted comment on KAFKA-10405:
------------------------------------------------
was (Author: JIRAUSER279886):
Kia ora rā kōrua [~mjsax] [~ableegoldman],
I've been looking into this test and its failures.
While I can say _why_ it's failing (it's expecting a log segment to be deleted
that isn't being deleted, as the next segment's base offset isn't less than or
equal to the log start offset (LSO) which was set by the StreamThread when it
asked for records with committed offsets to be deleted), I can't explain why
it's failing {_}now{_}. And from the comments, this failure appears to be
sporadic.
That said, I would like to propose that the test either be deleted or seriously
revised, for the following reasons:
# The test, as it stands, seems more focused on testing segment deletion
behaviour, which I expect is being tested elsewhere
# The code under test has at least two, if not three, concurrently moving
parts executing on timers with default periods that aren't being explicitly set
- e.g., StreamThread "purges" committed offsets on a timer, UnifiedLog deletes
segments on a timer, LocalLog rolls segments on a timer (and whether or not a
segment is rolled is based on another configured duration). I believe this
makes this test significantly fragile and likely to be non-deterministic on a
fundamental level.
# There's a massive amount of uncommented, or unexpressed, assumptions in the
test, and it's very hard to refactor or fix without the knowledge of the
original author.
I'm very happy to either delete it, or revise it.
But if we go for revision, how do we test what we're trying to test?
My understanding, based on my code reading, is that once the purge timer in the
StreamThread fires, a request is sent in the TaskManager through the
adminClient to delete records up to the offset of the last committed/consumed
record in the repartition topic.
And this is accomplished by setting the log start offset (LSO) to the last
committed offset + 1.
So would it be sufficient for this test, to verify that the LSO on the topic,
is the highest offset sent by the streams app in the deletion request +1?
I'm not, at this stage, even sure if testing it like this as possible, as it
would involve capturing the offsets sent by the streams app via the admin
client, but I feel that if it's possible to test this, it would be a better
test of the desired functionality than peering into topic sizes and relying on
assumed behaviour of the Log side functionality of the broker?
Alternatively, is there a possibility that this failure is actually exposing a
bug? I honestly don't have enough knowledge to answer that.
I have scattered a bunch of logging statements through UnifiedLog and LocalLog
to understand what's happening, and I can provide that output if it's of use in
answering my questions :)
Ngā mihi,
Liam Clarke
> Flaky Test
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> -------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
> Issue Type: Test
> Components: streams
> Reporter: Bill Bejeck
> Assignee: Luke Chen
> Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest >
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout
> 60000. Repartition topic
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition not purged
> data after 60000 ms.
> 14:25:19 at
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)