This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e4ae06d12 MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
5e4ae06d12 is described below

commit 5e4ae06d129f01d94ed27f73ea311a2adb0477e7
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Thu Jul 21 12:12:29 2022 -0700

    MINOR: fix flaky test test_standby_tasks_rebalance (#12428)
    
    * Description
    In this test, when third proc join, sometimes there are other rebalance 
scenarios such as followup joingroup request happens before syncgroup response 
was received by one of the proc and the previously assigned tasks for that proc 
is then lost during new joingroup request. This can result in standby tasks 
assigned as 3, 1, 2. This PR relax the expected assignment of 2, 2, 2 to a 
range of [1-3].
    
    * Some backgroud from Guozhang:
    I talked to @hao Li offline and also inspected the code a bit, and tl;dr is 
that I think the code logic is correct (i.e. we do not really have a bug), but 
we need to relax the test verification a little bit. The general idea behind 
the subscription info is that:
    
    When a client joins the group, its subscription will try to encode all its 
current assigned active and standby tasks, which would be used as prev active 
and standby tasks by the assignor in order to achieve some stickiness.
    
    When a client drops all its active/standby tasks due to errors, it does not 
actually report all empty from its subscription, instead it tries to check its 
local state directory (you can see that from TaskManager#getTaskOffsetSums 
which populates the taskOffsetSum. For active task, its offset would be “-2” 
a.k.a. LATEST_OFFSET, for standby task, its offset is an actual numerical 
number.
    
    So in this case, the proc2 which drops all its active and standby tasks, 
would still report all tasks that have some local state still, and since it was 
previously owning all six tasks (three as active, and three as standby), it 
would report all six as standbys, and when that happens the resulted assignment 
as @hao Li verified, is indeed the un-even one.
    
    So I think the actual “issue“ happens here, is when proc2 is a bit late 
sending the sync-group request, when the previous rebalance has already 
completed, and a follow-up rebalance has already triggered, in that case, the 
resulted un-even assignment is indeed expected. Such a scenario, though not 
common, is still legitimate since in practice all kinds of timing skewness 
across instances can happen. So I think we should just relax our verification 
here, i.e. just making sure that each  [...]
    
    Reviewers: Suhas Satish <ssat...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../tests/streams/streams_standby_replica_test.py     | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py 
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index a8c07513c1..c0e5953f73 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -73,9 +73,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
         processor_3.start()
 
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_3.STDOUT_FILE)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
 
         processor_1.stop()
 
@@ -93,9 +93,9 @@ class StreamsStandbyTask(BaseStreamsTest):
 
         processor_2.start()
 
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_2.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_3.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE, num_lines=2)
 
         processor_3.stop()
 
@@ -112,10 +112,9 @@ class StreamsStandbyTask(BaseStreamsTest):
         self.wait_for_verification(processor_2, "ACTIVE_TASKS:3 
STANDBY_TASKS:3", processor_2.STDOUT_FILE, num_lines=2)
 
         processor_1.start()
-
-        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_1.STDOUT_FILE)
-        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_3.STDOUT_FILE)
-        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:2", processor_2.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
+        self.wait_for_verification(processor_2, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_2.STDOUT_FILE, num_lines=2)
+        self.wait_for_verification(processor_3, "ACTIVE_TASKS:2 
STANDBY_TASKS:[1-3]", processor_3.STDOUT_FILE)
 
         self.assert_consume(self.client_id, "assert all messages consumed from 
%s" % self.streams_sink_topic_1,
                             self.streams_sink_topic_1, self.num_messages)

Reply via email to