Repository: kafka
Updated Branches:
  refs/heads/trunk 151b23689 -> 0483a0b0b


MINOR: Fix transient failure of testCannotSendToInternalTopic

It’s a simple matter of creating the internal topic before trying to send
to it. Otherwise, we could get an `UnknownTopicOrPartitionException`
in some cases.

Without the change, I could reproduce a failure in less than
5 runs. With the change, 30 consecutive runs succeeded.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>

Closes #2584 from ijuma/test-cannot-send-to-internal-topic-transient-failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0483a0b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0483a0b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0483a0b0

Branch: refs/heads/trunk
Commit: 0483a0b0b70e1ec7cd12c17f622f91ebc3932fc4
Parents: 151b236
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Fri Feb 24 11:04:37 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Feb 24 11:04:37 2017 -0800

----------------------------------------------------------------------
 .../kafka/api/IntegrationTestHarness.scala      |  7 +--
 .../kafka/api/ProducerFailureHandlingTest.scala | 15 +++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 51 ++++++++++++--------
 3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 10baa42..46465e8 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -75,12 +75,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       consumers += createNewConsumer
     }
 
-    // create the consumer offset topic
-    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName,
-      serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
-      
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
-      servers,
-      servers.head.groupCoordinator.offsetsTopicConfigs)
+    TestUtils.createOffsetsTopic(zkUtils, servers)
   }
 
   def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 5385bbe..ce81cae 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -123,7 +123,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     val record = new ProducerRecord(topic10, null, "key".getBytes, new 
Array[Byte](maxMessageSize - 50))
     val recordMetadata = producer3.send(record).get
 
-    assertEquals(topic10, recordMetadata.topic())
+    assertEquals(topic10, recordMetadata.topic)
   }
 
   /** This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74 */
@@ -176,8 +176,8 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   }
 
   /**
-    * Send with invalid partition id should throw KafkaException when 
partition is higher than the
-    * upper bound of partitions and IllegalArgumentException when partition is 
negative
+    * Send with invalid partition id should throw KafkaException when 
partition is higher than the upper bound of
+    * partitions.
     */
   @Test
   def testInvalidPartition() {
@@ -185,14 +185,14 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers)
 
     // create a record with incorrect partition id (higher than the number of 
partitions), send should fail
-    val higherRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 1, 
"key".getBytes, "value".getBytes)
+    val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, 
"value".getBytes)
     intercept[KafkaException] {
       producer1.send(higherRecord)
     }
   }
 
   /**
-   * The send call after producer closed should throw KafkaException cased by 
IllegalStateException
+   * The send call after producer closed should throw IllegalStateException
    */
   @Test
   def testSendAfterClosed() {
@@ -218,14 +218,13 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
       producer3.close()
       producer3.send(record)
     }
-
-    // re-close producer is fine
   }
 
   @Test
   def testCannotSendToInternalTopic() {
+    TestUtils.createOffsetsTopic(zkUtils, servers)
     val thrown = intercept[ExecutionException] {
-      producer2.send(new 
ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, 
"test".getBytes, "test".getBytes)).get
+      producer2.send(new ProducerRecord(Topic.GroupMetadataTopicName, 
"test".getBytes, "test".getBytes)).get
     }
     assertTrue("Unexpected exception while sending to an invalid topic " + 
thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c16618b..4f6a204 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -20,42 +20,40 @@ package kafka.utils
 import java.io._
 import java.nio._
 import java.nio.channels._
-import java.util.concurrent.{Callable, Executors, TimeUnit}
-import java.util.Properties
+import java.nio.charset.Charset
 import java.security.cert.X509Certificate
+import java.util.Properties
+import java.util.concurrent.{Callable, Executors, TimeUnit}
 import javax.net.ssl.X509TrustManager
-import charset.Charset
 
-import kafka.security.auth.{Acl, Authorizer, Resource}
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.utils.Utils._
-import org.apache.kafka.test.TestSslUtils
-
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import kafka.server._
-import kafka.producer._
-import kafka.message._
+import kafka.admin.AdminUtils
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
+import kafka.common.{Topic, TopicAndPartition}
 import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
-import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
-import kafka.common.TopicAndPartition
-import kafka.admin.AdminUtils
 import kafka.log._
+import kafka.message._
+import kafka.producer._
+import kafka.security.auth.{Acl, Authorizer, Resource}
+import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
+import kafka.server._
 import kafka.utils.ZkUtils._
-import org.junit.Assert._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.apache.kafka.common.utils.Utils._
+import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.junit.Assert._
 
-import scala.collection.Map
 import scala.collection.JavaConverters._
+import scala.collection.Map
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 /**
  * Utility functions to help with testing
@@ -271,6 +269,19 @@ object TestUtils extends Logging {
   }
 
   /**
+    * Create the consumer offsets/group metadata topic and wait until the 
leader is elected and metadata is propagated
+    * to all brokers.
+    */
+  def createOffsetsTopic(zkUtils: ZkUtils, servers: Seq[KafkaServer]): Unit = {
+    val server = servers.head
+    createTopic(zkUtils, Topic.GroupMetadataTopicName,
+      server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
+      
server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
+      servers,
+      server.groupCoordinator.offsetsTopicConfigs)
+  }
+
+  /**
    * Create a test config for a consumer
    */
   def createConsumerProperties(zkConnect: String, groupId: String, consumerId: 
String,

Reply via email to