[kafka] branch trunk updated: MINOR: Update the javadoc of SocketServer#startup(). (#7215)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b13391e MINOR: Update the javadoc of SocketServer#startup(). (#7215) b13391e is described below commit b13391e0fe0b249d46e47d886fbdafaaed7c1eef Author: David Jacot AuthorDate: Thu Aug 15 09:53:56 2019 -0700 MINOR: Update the javadoc of SocketServer#startup(). (#7215) Update the javadoc of SockerServer#startup(). SocketServer#startProcessors() does not exist any more and it has been replaced by SocketServer#startDataPlaneProcessors() and SocketServer#startControlPlaneProcessor(). Reviewers: Jason Gustafson --- core/src/main/scala/kafka/network/SocketServer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 69f02ae..0ebded8 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -103,7 +103,8 @@ class SocketServer(val config: KafkaConfig, /** * Start the socket server. Acceptors for all the listeners are started. Processors * are started if `startupProcessors` is true. If not, processors are only started when - * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed starting of processors + * [[kafka.network.SocketServer#startDataPlaneProcessors()]] or + * [[kafka.network.SocketServer#startControlPlaneProcessor()]] is invoked. Delayed starting of processors * is used to delay processing client connections until server is fully initialized, e.g. * to ensure that all credentials have been loaded before authentications are performed. * Acceptors are always started during `startup` so that the bound port is known when this
[kafka] branch trunk updated: MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 14215d1 MINOR: Use max retries for consumer group tests to avoid flakiness (#7186) 14215d1 is described below commit 14215d1b84e937c4656e0984c1ce76d9aac65bdd Author: Bob Barrett AuthorDate: Thu Aug 15 13:47:56 2019 -0700 MINOR: Use max retries for consumer group tests to avoid flakiness (#7186) This patch updates ConsumerGroupCommandTest.scala to use the maximum possible number of AdminClient retries. The test runs will still be bounded by the request timeout. This address flakiness in tests such as testResetOffsetsNotExistingGroup and testResetOffsetsExistingTopic, which was caused by group coordinators being intermittently unavailable. Reviewers: Ismael Juma , Jason Gustafson --- .../scala/kafka/admin/ConsumerGroupCommand.scala | 8 ++-- .../kafka/admin/ConsumerGroupCommandTest.scala | 5 ++- .../kafka/admin/DeleteConsumerGroupsTest.scala | 24 +-- .../kafka/admin/DescribeConsumerGroupTest.scala| 46 +++--- .../unit/kafka/admin/ListConsumerGroupTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 23 --- 6 files changed, 51 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 3f2ed32..8c4c5e4 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -158,9 +158,10 @@ object ConsumerGroupCommand extends Logging { } } - class ConsumerGroupService(val opts: ConsumerGroupCommandOptions) { + class ConsumerGroupService(val opts: ConsumerGroupCommandOptions, + private[admin] val configOverrides: Map[String, String] = Map.empty) { -private val adminClient = createAdminClient() +private val adminClient = createAdminClient(configOverrides) // `consumers` are only needed for `describe`, so we instantiate them lazily private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty @@ -528,9 +529,10 @@ object ConsumerGroupCommand extends Logging { ) } -private def createAdminClient(): Admin = { +private def createAdminClient(configOverrides: Map[String, String]): Admin = { val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + configOverrides.foreach { case (k, v) => props.put(k, v)} admin.AdminClient.create(props) } diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index d5eea98..c398940 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -25,14 +25,15 @@ import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGr import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer import org.junit.{After, Before} -import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer class ConsumerGroupCommandTest extends KafkaServerTestHarness { import ConsumerGroupCommandTest._ @@ -84,7 +85,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { def getConsumerGroupService(args: Array[String]): ConsumerGroupService = { val opts = new ConsumerGroupCommandOptions(args) -val service = new ConsumerGroupService(opts) +val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString)) consumerGroupService = service :: consumerGroupService service } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala index 769f33a..63fb84a 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -71,7 +71,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { TestUtils.waitUntilTrue(() => { service.collectGroupMe
[kafka] branch trunk updated: MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 83a575d MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163) 83a575d is described below commit 83a575d4b92a7a49917244dbf7cdee0d2c5aa253 Author: Mickael Maison AuthorDate: Thu Aug 15 23:49:29 2019 +0100 MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163) Reviewers: Colin P. McCabe --- .../kafka/common/utils/ImplicitLinkedHashCollection.java | 2 +- .../common/utils/ImplicitLinkedHashMultiCollection.java | 4 ++-- .../kafka/common/utils/ImplicitLinkedHashCollectionTest.java | 12 +++- .../common/utils/ImplicitLinkedHashMultiCollectionTest.java | 10 ++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java index e060629..fba7d7a 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java @@ -304,7 +304,7 @@ public class ImplicitLinkedHashCollection findAll(E key) { -if (key == null) { +if (key == null || size() == 0) { return Collections.emptyList(); } ArrayList results = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java index 8c102dd..389c24e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java @@ -31,6 +31,7 @@ import java.util.Random; import java.util.Set; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; @@ -271,7 +272,7 @@ public class ImplicitLinkedHashCollectionTest { @Test public void testEmptyListIterator() { ImplicitLinkedHashCollection coll = new ImplicitLinkedHashCollection<>(); -ListIterator iter = coll.valuesList().listIterator(); +ListIterator iter = coll.valuesList().listIterator(); assertFalse(iter.hasNext()); assertFalse(iter.hasPrevious()); assertEquals(0, iter.nextIndex()); @@ -513,6 +514,14 @@ public class ImplicitLinkedHashCollectionTest { assertNotEquals(coll2, coll3); } +@Test +public void testFindContainsRemoveOnEmptyCollection() { +ImplicitLinkedHashCollection coll = new ImplicitLinkedHashCollection<>(); +assertNull(coll.find(new TestElement(2))); +assertFalse(coll.contains(new TestElement(2))); +assertFalse(coll.remove(new TestElement(2))); +} + private void addRandomElement(Random random, LinkedHashSet existing, ImplicitLinkedHashCollection set) { int next; @@ -523,6 +532,7 @@ public class ImplicitLinkedHashCollectionTest { set.add(new TestElement(next)); } +@SuppressWarnings("unlikely-arg-type") private void removeRandomElement(Random random, Collection existing, ImplicitLinkedHashCollection coll) { int removeIdx = random.nextInt(existing.size()); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java index 8d2b850..ad87b55 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java @@ -28,6 +28,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -45,6 +46,15 @@ public class ImplicitLinkedHashMultiCollectionTest { } @Test +public void testFindFindAllContainsRemoveOnEmptyCollection() { +ImplicitLinkedHashMultiCollection coll = new ImplicitLinkedHashMultiCollection<>(); +assertNull(coll.find(new TestElement(2))); +assertFalse(coll.contains(new TestElement(2))); +assertFalse(coll.remove(new TestElement(2))); +assertTrue(coll.findAll(new TestElement(2)).isEmpty()); +} + +@Test public voi
[kafka] branch 2.3 updated: MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 9cb27f5 MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163) 9cb27f5 is described below commit 9cb27f5f80b82aca65ccb89f3b9ba4d07222c9df Author: Mickael Maison AuthorDate: Thu Aug 15 23:49:29 2019 +0100 MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163) Reviewers: Colin P. McCabe --- .../kafka/common/utils/ImplicitLinkedHashCollection.java | 2 +- .../common/utils/ImplicitLinkedHashMultiCollection.java | 4 ++-- .../kafka/common/utils/ImplicitLinkedHashCollectionTest.java | 12 +++- .../common/utils/ImplicitLinkedHashMultiCollectionTest.java | 10 ++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java index e060629..fba7d7a 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java @@ -304,7 +304,7 @@ public class ImplicitLinkedHashCollection findAll(E key) { -if (key == null) { +if (key == null || size() == 0) { return Collections.emptyList(); } ArrayList results = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java index 8c102dd..389c24e 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java @@ -31,6 +31,7 @@ import java.util.Random; import java.util.Set; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; @@ -271,7 +272,7 @@ public class ImplicitLinkedHashCollectionTest { @Test public void testEmptyListIterator() { ImplicitLinkedHashCollection coll = new ImplicitLinkedHashCollection<>(); -ListIterator iter = coll.valuesList().listIterator(); +ListIterator iter = coll.valuesList().listIterator(); assertFalse(iter.hasNext()); assertFalse(iter.hasPrevious()); assertEquals(0, iter.nextIndex()); @@ -513,6 +514,14 @@ public class ImplicitLinkedHashCollectionTest { assertNotEquals(coll2, coll3); } +@Test +public void testFindContainsRemoveOnEmptyCollection() { +ImplicitLinkedHashCollection coll = new ImplicitLinkedHashCollection<>(); +assertNull(coll.find(new TestElement(2))); +assertFalse(coll.contains(new TestElement(2))); +assertFalse(coll.remove(new TestElement(2))); +} + private void addRandomElement(Random random, LinkedHashSet existing, ImplicitLinkedHashCollection set) { int next; @@ -523,6 +532,7 @@ public class ImplicitLinkedHashCollectionTest { set.add(new TestElement(next)); } +@SuppressWarnings("unlikely-arg-type") private void removeRandomElement(Random random, Collection existing, ImplicitLinkedHashCollection coll) { int removeIdx = random.nextInt(existing.size()); diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java index 8d2b850..ad87b55 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java @@ -28,6 +28,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -45,6 +46,15 @@ public class ImplicitLinkedHashMultiCollectionTest { } @Test +public void testFindFindAllContainsRemoveOnEmptyCollection() { +ImplicitLinkedHashMultiCollection coll = new ImplicitLinkedHashMultiCollection<>(); +assertNull(coll.find(new TestElement(2))); +assertFalse(coll.contains(new TestElement(2))); +assertFalse(coll.remove(new TestElement(2))); +assertTrue(coll.findAll(new TestElement(2)).isEmpty()); +} + +@Test public void te