[kafka] branch trunk updated: MINOR: Update the javadoc of SocketServer#startup(). (#7215)

2019-08-15 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new b13391e  MINOR: Update the javadoc of SocketServer#startup(). (#7215)
b13391e is described below

commit b13391e0fe0b249d46e47d886fbdafaaed7c1eef
Author: David Jacot 
AuthorDate: Thu Aug 15 09:53:56 2019 -0700

MINOR: Update the javadoc of SocketServer#startup(). (#7215)

Update the javadoc of SockerServer#startup(). 
SocketServer#startProcessors() does not exist any more and it has been replaced 
by SocketServer#startDataPlaneProcessors() and 
SocketServer#startControlPlaneProcessor().

Reviewers: Jason Gustafson 
---
 core/src/main/scala/kafka/network/SocketServer.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 69f02ae..0ebded8 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -103,7 +103,8 @@ class SocketServer(val config: KafkaConfig,
   /**
* Start the socket server. Acceptors for all the listeners are started. 
Processors
* are started if `startupProcessors` is true. If not, processors are only 
started when
-   * [[kafka.network.SocketServer#startProcessors()]] is invoked. Delayed 
starting of processors
+   * [[kafka.network.SocketServer#startDataPlaneProcessors()]] or
+   * [[kafka.network.SocketServer#startControlPlaneProcessor()]] is invoked. 
Delayed starting of processors
* is used to delay processing client connections until server is fully 
initialized, e.g.
* to ensure that all credentials have been loaded before authentications 
are performed.
* Acceptors are always started during `startup` so that the bound port is 
known when this



[kafka] branch trunk updated: MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)

2019-08-15 Thread jgus
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 14215d1  MINOR: Use max retries for consumer group tests to avoid 
flakiness (#7186)
14215d1 is described below

commit 14215d1b84e937c4656e0984c1ce76d9aac65bdd
Author: Bob Barrett 
AuthorDate: Thu Aug 15 13:47:56 2019 -0700

MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)

This patch updates ConsumerGroupCommandTest.scala to use the maximum 
possible number of AdminClient retries. The test runs will still be bounded by 
the request timeout. This address flakiness in tests such as 
testResetOffsetsNotExistingGroup and testResetOffsetsExistingTopic, which was 
caused by group coordinators being intermittently unavailable.

Reviewers: Ismael Juma , Jason Gustafson 

---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  8 ++--
 .../kafka/admin/ConsumerGroupCommandTest.scala |  5 ++-
 .../kafka/admin/DeleteConsumerGroupsTest.scala | 24 +--
 .../kafka/admin/DescribeConsumerGroupTest.scala| 46 +++---
 .../unit/kafka/admin/ListConsumerGroupTest.scala   |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala| 23 ---
 6 files changed, 51 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 3f2ed32..8c4c5e4 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -158,9 +158,10 @@ object ConsumerGroupCommand extends Logging {
 }
   }
 
-  class ConsumerGroupService(val opts: ConsumerGroupCommandOptions) {
+  class ConsumerGroupService(val opts: ConsumerGroupCommandOptions,
+ private[admin] val configOverrides: Map[String, 
String] = Map.empty) {
 
-private val adminClient = createAdminClient()
+private val adminClient = createAdminClient(configOverrides)
 
 // `consumers` are only needed for `describe`, so we instantiate them 
lazily
 private lazy val consumers: mutable.Map[String, KafkaConsumer[String, 
String]] = mutable.Map.empty
@@ -528,9 +529,10 @@ object ConsumerGroupCommand extends Logging {
   )
 }
 
-private def createAdminClient(): Admin = {
+private def createAdminClient(configOverrides: Map[String, String]): Admin 
= {
   val props = if (opts.options.has(opts.commandConfigOpt)) 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new 
Properties()
   props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
+  configOverrides.foreach { case (k, v) => props.put(k, v)}
   admin.AdminClient.create(props)
 }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index d5eea98..c398940 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -25,14 +25,15 @@ import 
kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGr
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.{After, Before}
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   import ConsumerGroupCommandTest._
@@ -84,7 +85,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness 
{
 
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
 val opts = new ConsumerGroupCommandOptions(args)
-val service = new ConsumerGroupService(opts)
+val service = new ConsumerGroupService(opts, 
Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
 consumerGroupService = service :: consumerGroupService
 service
   }
diff --git 
a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 769f33a..63fb84a 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -71,7 +71,7 @@ class DeleteConsumerGroupsTest extends 
ConsumerGroupCommandTest {
 
 TestUtils.waitUntilTrue(() => {
   service.collectGroupMe

[kafka] branch trunk updated: MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163)

2019-08-15 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 83a575d  MINOR: Fix bugs in handling zero-length 
ImplicitLinkedHashCollections (#7163)
83a575d is described below

commit 83a575d4b92a7a49917244dbf7cdee0d2c5aa253
Author: Mickael Maison 
AuthorDate: Thu Aug 15 23:49:29 2019 +0100

MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections 
(#7163)

Reviewers: Colin P. McCabe 
---
 .../kafka/common/utils/ImplicitLinkedHashCollection.java |  2 +-
 .../common/utils/ImplicitLinkedHashMultiCollection.java  |  4 ++--
 .../kafka/common/utils/ImplicitLinkedHashCollectionTest.java | 12 +++-
 .../common/utils/ImplicitLinkedHashMultiCollectionTest.java  | 10 ++
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index e060629..fba7d7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -304,7 +304,7 @@ public class ImplicitLinkedHashCollection findAll(E key) {
-if (key == null) {
+if (key == null || size() == 0) {
 return Collections.emptyList();
 }
 ArrayList results = new ArrayList<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index 8c102dd..389c24e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -31,6 +31,7 @@ import java.util.Random;
 import java.util.Set;
 
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
@@ -271,7 +272,7 @@ public class ImplicitLinkedHashCollectionTest {
 @Test
 public void testEmptyListIterator() {
 ImplicitLinkedHashCollection coll = new 
ImplicitLinkedHashCollection<>();
-ListIterator iter = coll.valuesList().listIterator();
+ListIterator iter = coll.valuesList().listIterator();
 assertFalse(iter.hasNext());
 assertFalse(iter.hasPrevious());
 assertEquals(0, iter.nextIndex());
@@ -513,6 +514,14 @@ public class ImplicitLinkedHashCollectionTest {
 assertNotEquals(coll2, coll3);
 }
 
+@Test
+public void testFindContainsRemoveOnEmptyCollection() {
+ImplicitLinkedHashCollection coll = new 
ImplicitLinkedHashCollection<>();
+assertNull(coll.find(new TestElement(2)));
+assertFalse(coll.contains(new TestElement(2)));
+assertFalse(coll.remove(new TestElement(2)));
+}
+
 private void addRandomElement(Random random, LinkedHashSet 
existing,
   ImplicitLinkedHashCollection 
set) {
 int next;
@@ -523,6 +532,7 @@ public class ImplicitLinkedHashCollectionTest {
 set.add(new TestElement(next));
 }
 
+@SuppressWarnings("unlikely-arg-type")
 private void removeRandomElement(Random random, Collection 
existing,
  ImplicitLinkedHashCollection 
coll) {
 int removeIdx = random.nextInt(existing.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
index 8d2b850..ad87b55 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
@@ -28,6 +28,7 @@ import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -45,6 +46,15 @@ public class ImplicitLinkedHashMultiCollectionTest {
 }
 
 @Test
+public void testFindFindAllContainsRemoveOnEmptyCollection() {
+ImplicitLinkedHashMultiCollection coll = new 
ImplicitLinkedHashMultiCollection<>();
+assertNull(coll.find(new TestElement(2)));
+assertFalse(coll.contains(new TestElement(2)));
+assertFalse(coll.remove(new TestElement(2)));
+assertTrue(coll.findAll(new TestElement(2)).isEmpty());
+}
+
+@Test
 public voi

[kafka] branch 2.3 updated: MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections (#7163)

2019-08-15 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
 new 9cb27f5  MINOR: Fix bugs in handling zero-length 
ImplicitLinkedHashCollections (#7163)
9cb27f5 is described below

commit 9cb27f5f80b82aca65ccb89f3b9ba4d07222c9df
Author: Mickael Maison 
AuthorDate: Thu Aug 15 23:49:29 2019 +0100

MINOR: Fix bugs in handling zero-length ImplicitLinkedHashCollections 
(#7163)

Reviewers: Colin P. McCabe 
---
 .../kafka/common/utils/ImplicitLinkedHashCollection.java |  2 +-
 .../common/utils/ImplicitLinkedHashMultiCollection.java  |  4 ++--
 .../kafka/common/utils/ImplicitLinkedHashCollectionTest.java | 12 +++-
 .../common/utils/ImplicitLinkedHashMultiCollectionTest.java  | 10 ++
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index e060629..fba7d7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -304,7 +304,7 @@ public class ImplicitLinkedHashCollection findAll(E key) {
-if (key == null) {
+if (key == null || size() == 0) {
 return Collections.emptyList();
 }
 ArrayList results = new ArrayList<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index 8c102dd..389c24e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -31,6 +31,7 @@ import java.util.Random;
 import java.util.Set;
 
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
@@ -271,7 +272,7 @@ public class ImplicitLinkedHashCollectionTest {
 @Test
 public void testEmptyListIterator() {
 ImplicitLinkedHashCollection coll = new 
ImplicitLinkedHashCollection<>();
-ListIterator iter = coll.valuesList().listIterator();
+ListIterator iter = coll.valuesList().listIterator();
 assertFalse(iter.hasNext());
 assertFalse(iter.hasPrevious());
 assertEquals(0, iter.nextIndex());
@@ -513,6 +514,14 @@ public class ImplicitLinkedHashCollectionTest {
 assertNotEquals(coll2, coll3);
 }
 
+@Test
+public void testFindContainsRemoveOnEmptyCollection() {
+ImplicitLinkedHashCollection coll = new 
ImplicitLinkedHashCollection<>();
+assertNull(coll.find(new TestElement(2)));
+assertFalse(coll.contains(new TestElement(2)));
+assertFalse(coll.remove(new TestElement(2)));
+}
+
 private void addRandomElement(Random random, LinkedHashSet 
existing,
   ImplicitLinkedHashCollection 
set) {
 int next;
@@ -523,6 +532,7 @@ public class ImplicitLinkedHashCollectionTest {
 set.add(new TestElement(next));
 }
 
+@SuppressWarnings("unlikely-arg-type")
 private void removeRandomElement(Random random, Collection 
existing,
  ImplicitLinkedHashCollection 
coll) {
 int removeIdx = random.nextInt(existing.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
index 8d2b850..ad87b55 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
@@ -28,6 +28,7 @@ import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -45,6 +46,15 @@ public class ImplicitLinkedHashMultiCollectionTest {
 }
 
 @Test
+public void testFindFindAllContainsRemoveOnEmptyCollection() {
+ImplicitLinkedHashMultiCollection coll = new 
ImplicitLinkedHashMultiCollection<>();
+assertNull(coll.find(new TestElement(2)));
+assertFalse(coll.contains(new TestElement(2)));
+assertFalse(coll.remove(new TestElement(2)));
+assertTrue(coll.findAll(new TestElement(2)).isEmpty());
+}
+
+@Test
 public void te