[kafka] branch trunk updated: KAFKA-13592; Fix flaky test ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions (#11687)

2022-06-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 09570f2540 KAFKA-13592; Fix flaky test 
ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions (#11687)
09570f2540 is described below

commit 09570f2540269cc1196c4c69cc7997d035159d1d
Author: Kvicii <42023367+kvi...@users.noreply.github.com>
AuthorDate: Tue Jun 7 08:56:48 2022 +0800

KAFKA-13592; Fix flaky test 
ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions (#11687)

Fixes several race conditions in the test case causing the flaky failure.

Reviewers: Divij Vaidya , Jason Gustafson 


Co-authored-by: Kvicii 
---
 .../controller/ControllerIntegrationTest.scala | 75 ++
 1 file changed, 35 insertions(+), 40 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d8d90ede09..d49502dd62 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -125,7 +125,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
 val controllerId = TestUtils.waitUntilControllerElected(zkClient)
 // Need to make sure the broker we shutdown and startup are not the 
controller. Otherwise we will send out
-// full UpdateMetadataReuqest to all brokers during controller failover.
+// full UpdateMetadataRequest to all brokers during controller failover.
 val testBroker = servers.filter(e => e.config.brokerId != 
controllerId).head
 val remainingBrokers = servers.filter(_.config.brokerId != 
testBroker.config.brokerId)
 val topic = "topic1"
@@ -526,7 +526,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
 // create the topic
 TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = 
expectedReplicaAssignment, servers = servers)
 
-val controllerId = zkClient.getControllerId.get
+val controllerId = TestUtils.waitUntilControllerElected(zkClient)
 val controller = servers.find(p => p.config.brokerId == 
controllerId).get.kafkaController
 val resultQueue = new 
LinkedBlockingQueue[Try[collection.Set[TopicPartition]]]()
 val controlledShutdownCallback = (controlledShutdownResult: 
Try[collection.Set[TopicPartition]]) => 
resultQueue.put(controlledShutdownResult)
@@ -1253,13 +1253,13 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 adminZkClient.createTopic(tp.topic, 1, 1)
 waitForPartitionState(tp, firstControllerEpoch, 0, 
LeaderAndIsr.InitialLeaderEpoch,
   "failed to get expected partition state upon topic creation")
-val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+val (topicIdAfterCreate, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
 assertTrue(topicIdAfterCreate.isDefined)
 assertEquals(topicIdAfterCreate, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
   "correct topic ID cannot be found in the controller context")
 
 adminZkClient.addPartitions(tp.topic, assignment, 
adminZkClient.getBrokerMetadatas(), 2)
-val topicIdAfterAddition = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+val (topicIdAfterAddition, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
 assertEquals(topicIdAfterCreate, topicIdAfterAddition)
 assertEquals(topicIdAfterCreate, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
   "topic ID changed after partition additions")
@@ -1279,13 +1279,13 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 adminZkClient.createTopic(tp.topic, 1, 1)
 waitForPartitionState(tp, firstControllerEpoch, 0, 
LeaderAndIsr.InitialLeaderEpoch,
   "failed to get expected partition state upon topic creation")
-val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+val (topicIdAfterCreate, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
 assertEquals(None, topicIdAfterCreate)
 assertEquals(topicIdAfterCreate, 
servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
   "incorrect topic ID can be found in the controller context")
 
 adminZkClient.addPartitions(tp.topic, assignment, 
adminZkClient.getBrokerMetadatas(), 2)
-val topicIdAfterAddition = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+val (topicIdAfterAddition, _) = 

[kafka] branch trunk updated: MINOR: A fewer method javadoc and typo fix (#12253)

2022-06-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 75223b668d MINOR: A fewer method javadoc and typo fix (#12253)
75223b668d is described below

commit 75223b668d24f630670ece03d29de631e6f8bd20
Author: bozhao12 <102274736+bozha...@users.noreply.github.com>
AuthorDate: Tue Jun 7 03:25:05 2022 +0800

MINOR: A fewer method javadoc and typo fix (#12253)

Fixes an unneeded parameter doc in `MemoryRecordsBuilder` and a typo in 
`LazyDownConversionRecordsSend`.

Reviewers: Kvicii <42023367+kvi...@users.noreply.github.com>, Jason 
Gustafson 

Co-authored-by: zhaobo 
---
 .../org/apache/kafka/common/record/LazyDownConversionRecordsSend.java   | 2 +-
 .../main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java  | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index 17addef74d..0117651845 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -87,7 +87,7 @@ public final class LazyDownConversionRecordsSend extends 
RecordsSend

[kafka] branch trunk updated: HOTFIX: add space to avoid checkstyle failure

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 5d593287c7 HOTFIX: add space to avoid checkstyle failure
5d593287c7 is described below

commit 5d593287c7db7030c444a469db72d59b3883d904
Author: Guozhang Wang 
AuthorDate: Mon Jun 6 11:34:59 2022 -0700

HOTFIX: add space to avoid checkstyle failure
---
 .../kafka/clients/consumer/internals/ConsumerCoordinatorTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 180053ee22..c65d33176f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -558,7 +558,7 @@ public abstract class ConsumerCoordinatorTest {
 
 // should try to find coordinator since we are commit async
 coordinator.commitOffsetsAsync(singletonMap(t1p, new 
OffsetAndMetadata(100L)), (offsets, exception) -> {
-fail("Commit should not get responses, but got offsets:" + offsets 
+", and exception:" + exception);
+fail("Commit should not get responses, but got offsets:" + offsets 
+ ", and exception:" + exception);
 });
 coordinator.poll(time.timer(0));
 assertTrue(coordinator.coordinatorUnknown());



[kafka] branch 3.2 updated (90db4f47d6 -> b61edf2037)

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a change to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker 
shutdown directly (#12136)
 add 173b8fd26d HOTFIX: only try to clear discover-coordinator future upon 
commit (#12244)
 add b61edf2037 HOTFIX: add space to avoid checkstyle failure

No new revisions were added by this update.

Summary of changes:
 .../consumer/internals/ConsumerCoordinator.java| 23 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java | 57 --
 3 files changed, 73 insertions(+), 8 deletions(-)



[kafka] branch trunk updated: MINOR: Correctly mark some tests as integration tests (#12223)

2022-06-06 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 601051354b MINOR: Correctly mark some tests as integration tests 
(#12223)
601051354b is described below

commit 601051354b4a1c6dede521c2966bbf7eb189f99d
Author: Divij Vaidya 
AuthorDate: Mon Jun 6 20:18:24 2022 +0200

MINOR: Correctly mark some tests as integration tests (#12223)

Also fix package name of `ListOffsetsIntegrationTest`.

Reviewers: dengziming , Jason Gustafson 

---
 .../scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala | 2 +-
 .../integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala | 1 -
 .../streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java | 3 +++
 .../KTableKTableForeignKeyJoinMaterializationIntegrationTest.java  | 3 +++
 .../apache/kafka/streams/integration/NamedTopologyIntegrationTest.java | 3 +++
 5 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index ccc2bdecc8..2a148947fd 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package integration.kafka.admin
+package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
index c9d40cadb0..4d48bf5a86 100644
--- 
a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala
@@ -15,7 +15,6 @@ package kafka.api
 
 import java.util
 import java.util.Properties
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 176d27a2b3..931aaf8e53 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -33,10 +33,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
@@ -60,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 
 @RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
 public class KTableKTableForeignKeyJoinIntegrationTest {
 @Rule
 public Timeout globalTimeout = Timeout.seconds(600);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 5b63a655e2..2a36556c99 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -32,10 +32,12 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
@@ -58,6 +60,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 
 @RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
 public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
 @Rule
 public Timeout globalTimeout = Timeout.seconds(600);
diff 

[kafka] branch trunk updated: HOTFIX: only try to clear discover-coordinator future upon commit (#12244)

2022-06-06 Thread guozhang
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 2047fc3715 HOTFIX: only try to clear discover-coordinator future upon 
commit (#12244)
2047fc3715 is described below

commit 2047fc371500286ba3e41d16551d2067edad
Author: Guozhang Wang 
AuthorDate: Mon Jun 6 11:05:41 2022 -0700

HOTFIX: only try to clear discover-coordinator future upon commit (#12244)

This is another way of fixing KAFKA-13563 other than #11631.

Instead of letting the consumer to always try to discover coordinator in 
pool with either mode (subscribe / assign), we defer the clearance of discover 
future upon committing async only. More specifically, under manual assign mode, 
there are only three places where we need the coordinator:

* commitAsync (both by the consumer itself or triggered by caller), this is 
where we want to fix.
* commitSync, which we already try to re-discovery coordinator.
* committed (both by the consumer itself based on reset policy, or 
triggered by caller), which we already try to re-discovery coordinator.

The benefits are that for manual assign mode that does not try to trigger 
any of the above three, then we never would be discovering coordinator. The 
original fix in #11631 would let the consumer to discover coordinator even if 
none of the above operations are required.

Reviewers: Luke Chen , David Jacot 
---
 .../consumer/internals/ConsumerCoordinator.java| 23 +++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java | 57 --
 3 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 51fa0b62ed..b853ff99e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.time.Duration;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -548,14 +549,18 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 }
 }
 } else {
-// For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+// For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+// instead we only try to refresh metadata when necessary.
 // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
 // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-if (coordinatorUnknownAndUnready(timer)) {
-return false;
+if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+client.awaitMetadataUpdate(timer);
 }
+
+// if there is pending coordinator requests, ensure they have a 
chance to be transmitted.
+client.pollNoWakeup();
 }
 
 maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
@@ -1004,7 +1009,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 if (offsets.isEmpty()) {
 // No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
 future = doCommitOffsetsAsync(offsets, callback);
-} else if (!coordinatorUnknown()) {
+} else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+// we need to make sure coordinator is ready before committing, 
since
+// this is for async committing we do not try to block, but just 
try once to
+// clear the previous discover-coordinator future, resend, or get 
responses;
+// if the coordinator is not ready yet then we would just proceed 
and put that into the
+// pending requests, and future poll calls would still try to 
complete them.
+//
+// the key here though is that we have to try sending the 
discover-coordinator if
+// it's not known or ready, since this is the only place we can 
send such request
+// under manual assignment (there