Repository: kafka Updated Branches: refs/heads/trunk 151b23689 -> 0483a0b0b
MINOR: Fix transient failure of testCannotSendToInternalTopic Itâs a simple matter of creating the internal topic before trying to send to it. Otherwise, we could get an `UnknownTopicOrPartitionException` in some cases. Without the change, I could reproduce a failure in less than 5 runs. With the change, 30 consecutive runs succeeded. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2584 from ijuma/test-cannot-send-to-internal-topic-transient-failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0483a0b0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0483a0b0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0483a0b0 Branch: refs/heads/trunk Commit: 0483a0b0b70e1ec7cd12c17f622f91ebc3932fc4 Parents: 151b236 Author: Ismael Juma <ism...@juma.me.uk> Authored: Fri Feb 24 11:04:37 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Feb 24 11:04:37 2017 -0800 ---------------------------------------------------------------------- .../kafka/api/IntegrationTestHarness.scala | 7 +-- .../kafka/api/ProducerFailureHandlingTest.scala | 15 +++--- .../test/scala/unit/kafka/utils/TestUtils.scala | 51 ++++++++++++-------- 3 files changed, 39 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 10baa42..46465e8 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -75,12 +75,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumers += createNewConsumer } - // create the consumer offset topic - TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, - serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, - serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, - servers, - servers.head.groupCoordinator.offsetsTopicConfigs) + TestUtils.createOffsetsTopic(zkUtils, servers) } def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 5385bbe..ce81cae 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -123,7 +123,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { val record = new ProducerRecord(topic10, null, "key".getBytes, new Array[Byte](maxMessageSize - 50)) val recordMetadata = producer3.send(record).get - assertEquals(topic10, recordMetadata.topic()) + assertEquals(topic10, recordMetadata.topic) } /** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */ @@ -176,8 +176,8 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { } /** - * Send with invalid partition id should throw KafkaException when partition is higher than the - * upper bound of partitions and IllegalArgumentException when partition is negative + * Send with invalid partition id should throw KafkaException when partition is higher than the upper bound of + * partitions. */ @Test def testInvalidPartition() { @@ -185,14 +185,14 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { TestUtils.createTopic(zkUtils, topic1, 1, numServers, servers) // create a record with incorrect partition id (higher than the number of partitions), send should fail - val higherRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1, 1, "key".getBytes, "value".getBytes) + val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes) intercept[KafkaException] { producer1.send(higherRecord) } } /** - * The send call after producer closed should throw KafkaException cased by IllegalStateException + * The send call after producer closed should throw IllegalStateException */ @Test def testSendAfterClosed() { @@ -218,14 +218,13 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { producer3.close() producer3.send(record) } - - // re-close producer is fine } @Test def testCannotSendToInternalTopic() { + TestUtils.createOffsetsTopic(zkUtils, servers) val thrown = intercept[ExecutionException] { - producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + producer2.send(new ProducerRecord(Topic.GroupMetadataTopicName, "test".getBytes, "test".getBytes)).get } assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException]) } http://git-wip-us.apache.org/repos/asf/kafka/blob/0483a0b0/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c16618b..4f6a204 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -20,42 +20,40 @@ package kafka.utils import java.io._ import java.nio._ import java.nio.channels._ -import java.util.concurrent.{Callable, Executors, TimeUnit} -import java.util.Properties +import java.nio.charset.Charset import java.security.cert.X509Certificate +import java.util.Properties +import java.util.concurrent.{Callable, Executors, TimeUnit} import javax.net.ssl.X509TrustManager -import charset.Charset -import kafka.security.auth.{Acl, Authorizer, Resource} -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.utils.Utils._ -import org.apache.kafka.test.TestSslUtils - -import scala.collection.mutable.{ArrayBuffer, ListBuffer} -import kafka.server._ -import kafka.producer._ -import kafka.message._ +import kafka.admin.AdminUtils import kafka.api._ import kafka.cluster.{Broker, EndPoint} +import kafka.common.{Topic, TopicAndPartition} import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream} -import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder} -import kafka.common.TopicAndPartition -import kafka.admin.AdminUtils import kafka.log._ +import kafka.message._ +import kafka.producer._ +import kafka.security.auth.{Acl, Authorizer, Resource} +import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder} +import kafka.server._ import kafka.utils.ZkUtils._ -import org.junit.Assert._ -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.record._ import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} import org.apache.kafka.common.utils.Time -import org.apache.kafka.test.{TestUtils => JTestUtils} +import org.apache.kafka.common.utils.Utils._ +import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils} +import org.junit.Assert._ -import scala.collection.Map import scala.collection.JavaConverters._ +import scala.collection.Map +import scala.collection.mutable.{ArrayBuffer, ListBuffer} /** * Utility functions to help with testing @@ -271,6 +269,19 @@ object TestUtils extends Logging { } /** + * Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated + * to all brokers. + */ + def createOffsetsTopic(zkUtils: ZkUtils, servers: Seq[KafkaServer]): Unit = { + val server = servers.head + createTopic(zkUtils, Topic.GroupMetadataTopicName, + server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp), + server.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, + servers, + server.groupCoordinator.offsetsTopicConfigs) + } + + /** * Create a test config for a consumer */ def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String,