Repository: spark Updated Branches: refs/heads/master 5fd54b994 -> 9293734d3
http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala new file mode 100644 index 0000000..64bf503 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.concurrent.atomic.AtomicInteger + +import scala.util.Random + +import org.apache.kafka.clients.producer.RecordMetadata +import org.scalatest.BeforeAndAfter +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.test.SharedSQLContext + + +abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override val streamingTimeout = 30.seconds + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + super.afterAll() + } + } + + protected def makeSureGetOffsetCalled = AssertOnQuery { q => + // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure + // its "getOffset" is called before pushing any data. Otherwise, because of the race contion, + // we don't know which data should be fetched when `startingOffset` is latest. + q.processAllAvailable() + true + } + + /** + * Add data to Kafka. + * + * `topicAction` can be used to run actions for each topic before inserting data. + */ + case class AddKafkaData(topics: Set[String], data: Int*) + (implicit ensureDataInMultiplePartition: Boolean = false, + concurrent: Boolean = false, + message: String = "", + topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData { + + override def addData(query: Option[StreamExecution]): (Source, Offset) = { + if (query.get.isActive) { + // Make sure no Spark job is running when deleting a topic + query.get.processAllAvailable() + } + + val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap + val newTopics = topics.diff(existingTopics.keySet) + for (newTopic <- newTopics) { + topicAction(newTopic, None) + } + for (existingTopicPartitions <- existingTopics) { + topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2)) + } + + // Read all topics again in case some topics are delete. + val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys + require( + query.nonEmpty, + "Cannot add data when there is no query for finding the active kafka source") + + val sources = query.get.logicalPlan.collect { + case StreamingExecutionRelation(source, _) if source.isInstanceOf[KafkaSource] => + source.asInstanceOf[KafkaSource] + } + if (sources.isEmpty) { + throw new Exception( + "Could not find Kafka source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { + throw new Exception( + "Could not select the Kafka source in the StreamExecution logical plan as there" + + "are multiple Kafka sources:\n\t" + sources.mkString("\n\t")) + } + val kafkaSource = sources.head + val topic = topics.toSeq(Random.nextInt(topics.size)) + val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray) + + def metadataToStr(m: (String, RecordMetadata)): String = { + s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" + } + // Verify that the test data gets inserted into multiple partitions + if (ensureDataInMultiplePartition) { + require( + sentMetadata.groupBy(_._2.partition).size > 1, + s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}") + } + + val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics)) + logInfo(s"Added data, expected offset $offset") + (kafkaSource, offset) + } + + override def toString: String = + s"AddKafkaData(topics = $topics, data = $data, message = $message)" + } +} + + +class KafkaSourceSuite extends KafkaSourceTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + test("cannot stop Kafka stream") { + val topic = newTopic() + testUtils.createTopic(newTopic(), partitions = 5) + testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", s"topic-.*") + + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + StopStream + ) + } + + test("subscribing topic by name from latest offsets") { + val topic = newTopic() + testFromLatestOffsets(topic, "subscribe" -> topic) + } + + test("subscribing topic by name from earliest offsets") { + val topic = newTopic() + testFromEarliestOffsets(topic, "subscribe" -> topic) + } + + test("subscribing topic by pattern from latest offsets") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromLatestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + } + + test("subscribing topic by pattern from earliest offsets") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-suffix" + testFromEarliestOffsets(topic, "subscribePattern" -> s"$topicPrefix-.*") + } + + test("subscribing topic by pattern with topic deletions") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-seems" + val topic2 = topicPrefix + "-bad" + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("-1")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", s"$topicPrefix-.*") + .option("failOnDataLoss", "false") + + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(2, 3, 4), + Assert { + testUtils.deleteTopic(topic) + testUtils.createTopic(topic2, partitions = 5) + true + }, + AddKafkaData(Set(topic2), 4, 5, 6), + CheckAnswer(2, 3, 4, 5, 6, 7) + ) + } + + test("bad source options") { + def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { + val ex = intercept[IllegalArgumentException] { + val reader = spark + .readStream + .format("kafka") + options.foreach { case (k, v) => reader.option(k, v) } + reader.load() + } + expectedMsgs.foreach { m => + assert(ex.getMessage.toLowerCase.contains(m.toLowerCase)) + } + } + + // No strategy specified + testBadOptions()("options must be specified", "subscribe", "subscribePattern") + + // Multiple strategies specified + testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")( + "only one", "options can be specified") + + testBadOptions("subscribe" -> "")("no topics to subscribe") + testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") + } + + test("unsupported kafka configs") { + def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = { + val ex = intercept[IllegalArgumentException] { + val reader = spark + .readStream + .format("kafka") + .option("subscribe", "topic") + .option("kafka.bootstrap.servers", "somehost") + .option(s"$key", value) + reader.load() + } + assert(ex.getMessage.toLowerCase.contains("not supported")) + } + + testUnsupportedConfig("kafka.group.id") + testUnsupportedConfig("kafka.auto.offset.reset") + testUnsupportedConfig("kafka.enable.auto.commit") + testUnsupportedConfig("kafka.interceptor.classes") + testUnsupportedConfig("kafka.key.deserializer") + testUnsupportedConfig("kafka.value.deserializer") + + testUnsupportedConfig("kafka.auto.offset.reset", "none") + testUnsupportedConfig("kafka.auto.offset.reset", "someValue") + testUnsupportedConfig("kafka.auto.offset.reset", "earliest") + testUnsupportedConfig("kafka.auto.offset.reset", "latest") + } + + private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" + + private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = { + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("-1")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("startingOffset", s"latest") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(2, 3, 4), + StopStream, + StartStream(), + CheckAnswer(2, 3, 4), // Should get the data back on recovery + StopStream, + AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped + StartStream(), + CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data + AddKafkaData(Set(topic), 7, 8), + CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), + AssertOnQuery("Add partitions") { query: StreamExecution => + testUtils.addPartitions(topic, 10) + true + }, + AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), + CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) + ) + } + + private def testFromEarliestOffsets(topic: String, options: (String, String)*): Unit = { + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark.readStream + reader + .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) + .option("startingOffset", s"earliest") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + options.foreach { case (k, v) => reader.option(k, v) } + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped + CheckAnswer(2, 3, 4, 5, 6, 7), + StopStream, + StartStream(), + CheckAnswer(2, 3, 4, 5, 6, 7), + StopStream, + AddKafkaData(Set(topic), 7, 8), + StartStream(), + CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), + AssertOnQuery("Add partitions") { query: StreamExecution => + testUtils.addPartitions(topic, 10) + true + }, + AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), + CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) + ) + } +} + + +class KafkaSourceStressSuite extends KafkaSourceTest with BeforeAndAfter { + + import testImplicits._ + + val topicId = new AtomicInteger(1) + + @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic) + + def newStressTopic: String = s"stress${topicId.getAndIncrement()}" + + private def nextInt(start: Int, end: Int): Int = { + start + Random.nextInt(start + end - 1) + } + + after { + for (topic <- testUtils.getAllTopicsAndPartitionSize().toMap.keys) { + testUtils.deleteTopic(topic) + } + } + + test("stress test with multiple topics and partitions") { + topics.foreach { topic => + testUtils.createTopic(topic, partitions = nextInt(1, 6)) + testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) + } + + // Create Kafka source that reads from latest offset + val kafka = + spark.readStream + .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", "stress.*") + .option("failOnDataLoss", "false") + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + val mapped = kafka.map(kv => kv._2.toInt + 1) + + runStressTest( + mapped, + Seq(makeSureGetOffsetCalled), + (d, running) => { + Random.nextInt(5) match { + case 0 => // Add a new topic + topics = topics ++ Seq(newStressTopic) + AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic", + topicAction = (topic, partition) => { + if (partition.isEmpty) { + testUtils.createTopic(topic, partitions = nextInt(1, 6)) + } + }) + case 1 if running => + // Only delete a topic when the query is running. Otherwise, we may lost data and + // cannot check the correctness. + val deletedTopic = topics(Random.nextInt(topics.size)) + if (deletedTopic != topics.head) { + topics = topics.filterNot(_ == deletedTopic) + } + AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic", + topicAction = (topic, partition) => { + // Never remove the first topic to make sure we have at least one topic + if (topic == deletedTopic && deletedTopic != topics.head) { + testUtils.deleteTopic(deletedTopic) + } + }) + case 2 => // Add new partitions + AddKafkaData(topics.toSet, d: _*)(message = "Add partitiosn", + topicAction = (topic, partition) => { + testUtils.addPartitions(topic, partition.get + nextInt(1, 6)) + }) + case _ => // Just add new data + AddKafkaData(topics.toSet, d: _*) + } + }, + iterations = 50) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala new file mode 100644 index 0000000..3eb8a73 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io.File +import java.lang.{Integer => JInt} +import java.net.InetSocketAddress +import java.util.{Map => JMap, Properties} +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.language.postfixOps +import scala.util.Random + +import kafka.admin.AdminUtils +import kafka.api.Request +import kafka.common.TopicAndPartition +import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint} +import kafka.utils.ZkUtils +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf + +/** + * This is a helper class for Kafka test suites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + * + * The reason to put Kafka test utility class in src is to test Python related Kafka APIs. + */ +class KafkaTestUtils extends Logging { + + // Zookeeper related configurations + private val zkHost = "localhost" + private var zkPort: Int = 0 + private val zkConnectionTimeout = 60000 + private val zkSessionTimeout = 6000 + + private var zookeeper: EmbeddedZookeeper = _ + + private var zkUtils: ZkUtils = _ + + // Kafka broker related configurations + private val brokerHost = "localhost" + private var brokerPort = 0 + private var brokerConf: KafkaConfig = _ + + // Kafka broker server + private var server: KafkaServer = _ + + // Kafka producer + private var producer: Producer[String, String] = _ + + // Flag to test whether the system is correctly started + private var zkReady = false + private var brokerReady = false + + def zkAddress: String = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") + s"$zkHost:$zkPort" + } + + def brokerAddress: String = { + assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address") + s"$brokerHost:$brokerPort" + } + + def zookeeperClient: ZkUtils = { + assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client") + Option(zkUtils).getOrElse( + throw new IllegalStateException("Zookeeper client is not yet initialized")) + } + + // Set up the Embedded Zookeeper server and get the proper Zookeeper port + private def setupEmbeddedZookeeper(): Unit = { + // Zookeeper server startup + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + // Get the actual zookeeper binding port + zkPort = zookeeper.actualPort + zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) + zkReady = true + } + + // Set up the Embedded Kafka server + private def setupEmbeddedKafkaServer(): Unit = { + assert(zkReady, "Zookeeper should be set up beforehand") + + // Kafka broker startup + Utils.startServiceOnPort(brokerPort, port => { + brokerPort = port + brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) + server = new KafkaServer(brokerConf) + server.startup() + brokerPort = server.boundPort() + (server, brokerPort) + }, new SparkConf(), "KafkaBroker") + + brokerReady = true + } + + /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ + def setup(): Unit = { + setupEmbeddedZookeeper() + setupEmbeddedKafkaServer() + } + + /** Teardown the whole servers, including Kafka broker and Zookeeper */ + def teardown(): Unit = { + brokerReady = false + zkReady = false + + if (producer != null) { + producer.close() + producer = null + } + + if (server != null) { + server.shutdown() + server = null + } + + brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { + AdminUtils.createTopic(zkUtils, topic, partitions, 1) + // wait until metadata is propagated + (0 until partitions).foreach { p => + waitUntilMetadataIsPropagated(topic, p) + } + } + + def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { + zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq + } + + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String): Unit = { + createTopic(topic, 1) + } + + /** Delete a Kafka topic and wait until it is propagated to the whole cluster */ + def deleteTopic(topic: String): Unit = { + val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size + AdminUtils.deleteTopic(zkUtils, topic) + verifyTopicDeletion(zkUtils, topic, partitions, List(this.server)) + } + + /** Add new paritions to a Kafka topic */ + def addPartitions(topic: String, partitions: Int): Unit = { + AdminUtils.addPartitions(zkUtils, topic, partitions) + // wait until metadata is propagated + (0 until partitions).foreach { p => + waitUntilMetadataIsPropagated(topic, p) + } + } + + /** Java-friendly function for sending messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { + sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) + } + + /** Send the messages to the Kafka broker */ + def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = { + val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray + sendMessages(topic, messages) + } + + /** Send the array of messages to the Kafka broker */ + def sendMessages(topic: String, messages: Array[String]): Seq[(String, RecordMetadata)] = { + producer = new KafkaProducer[String, String](producerConfiguration) + val offsets = try { + messages.map { m => + val metadata = + producer.send(new ProducerRecord[String, String](topic, m)).get(10, TimeUnit.SECONDS) + logInfo(s"\tSent $m to partition ${metadata.partition}, offset ${metadata.offset}") + (m, metadata) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + offsets + } + + def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { + val kc = new KafkaConsumer[String, String](consumerConfiguration) + logInfo("Created consumer to get latest offsets") + kc.subscribe(topics.asJavaCollection) + kc.poll(0) + val partitions = kc.assignment() + kc.pause(partitions) + kc.seekToEnd(partitions) + val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap + kc.close() + logInfo("Closed consumer to get latest offsets") + offsets + } + + private def brokerConfiguration: Properties = { + val props = new Properties() + props.put("broker.id", "0") + props.put("host.name", "localhost") + props.put("advertised.host.name", "localhost") + props.put("port", brokerPort.toString) + props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("zookeeper.connect", zkAddress) + props.put("log.flush.interval.messages", "1") + props.put("replica.socket.timeout.ms", "1500") + props.put("delete.topic.enable", "true") + props + } + + private def producerConfiguration: Properties = { + val props = new Properties() + props.put("bootstrap.servers", brokerAddress) + props.put("value.serializer", classOf[StringSerializer].getName) + props.put("key.serializer", classOf[StringSerializer].getName) + // wait for all in-sync replicas to ack sends + props.put("acks", "all") + props + } + + private def consumerConfiguration: Properties = { + val props = new Properties() + props.put("bootstrap.servers", brokerAddress) + props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt) + props.put("value.deserializer", classOf[StringDeserializer].getName) + props.put("key.deserializer", classOf[StringDeserializer].getName) + props.put("enable.auto.commit", "false") + props + } + + private def verifyTopicDeletion( + zkUtils: ZkUtils, + topic: String, + numPartitions: Int, + servers: Seq[KafkaServer]) { + import ZkUtils._ + val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + def isDeleted(): Boolean = { + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic)) + val topicPath = !zkUtils.pathExists(getTopicPath(topic)) + // ensure that the topic-partition has been deleted from all brokers' replica managers + val replicaManager = servers.forall(server => topicAndPartitions.forall(tp => + server.replicaManager.getPartition(tp.topic, tp.partition) == None)) + // ensure that logs from all replicas are deleted if delete topic is marked successful + val logManager = servers.forall(server => topicAndPartitions.forall(tp => + server.getLogManager().getLog(tp).isEmpty)) + // ensure that topic is removed from all cleaner offsets + val cleaner = servers.forall(server => topicAndPartitions.forall { tp => + val checkpoints = server.getLogManager().logDirs.map { logDir => + new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + } + checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) + }) + deletePath && topicPath && replicaManager && logManager && cleaner + } + eventually(timeout(10.seconds)) { + assert(isDeleted, s"$topic not deleted after timeout") + } + } + + private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { + def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { + case Some(partitionState) => + val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + + zkUtils.getLeaderForPartition(topic, partition).isDefined && + Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && + leaderAndInSyncReplicas.isr.size >= 1 + + case _ => + false + } + eventually(timeout(10.seconds)) { + assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout") + } + } + + private class EmbeddedZookeeper(val zkConnect: String) { + val snapshotDir = Utils.createTempDir() + val logDir = Utils.createTempDir() + + val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500) + val (ip, port) = { + val splits = zkConnect.split(":") + (splits(0), splits(1).toInt) + } + val factory = new NIOServerCnxnFactory() + factory.configure(new InetSocketAddress(ip, port), 16) + factory.startup(zookeeper) + + val actualPort = factory.getLocalPort + + def shutdown() { + factory.shutdown() + Utils.deleteRecursively(snapshotDir) + Utils.deleteRecursively(logDir) + } + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8408f4b..37976b0 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ <module>external/kafka-0-8-assembly</module> <module>external/kafka-0-10</module> <module>external/kafka-0-10-assembly</module> + <module>external/kafka-0-10-sql</module> </modules> <properties> http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8e47e7f..88d5dc9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -39,8 +39,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile - val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer) = Seq( - "catalyst", "sql", "hive", "hive-thriftserver" + val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = Seq( + "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10" ).map(ProjectRef(buildLocation, _)) val streamingProjects@Seq( @@ -353,7 +353,7 @@ object SparkBuild extends PomBuild { val mimaProjects = allProjects.filterNot { x => Seq( spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn, - unsafe, tags + unsafe, tags, sqlKafka010 ).contains(x) } http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9825f19..b3a0d6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -116,7 +116,7 @@ class StreamExecution( * [[HDFSMetadataLog]]. See SPARK-14131 for more details. */ val microBatchThread = - new UninterruptibleThread(s"stream execution thread for $name") { + new StreamExecutionThread(s"stream execution thread for $name") { override def run(): Unit = { // To fix call site like "run at <unknown>:0", we bridge the call site from the caller // thread to this micro batch thread @@ -530,3 +530,9 @@ object StreamExecution { def nextId: Long = _nextId.getAndIncrement() } + +/** + * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread + * and will use `classOf[StreamExecutionThread]` to check. + */ +abstract class StreamExecutionThread(name: String) extends UninterruptibleThread(name) http://git-wip-us.apache.org/repos/asf/spark/blob/9293734d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index aa6515b..09140a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} * * {{{ * val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(_ + 1) - - testStream(mapped)( - AddData(inputData, 1, 2, 3), - CheckAnswer(2, 3, 4)) + * val mapped = inputData.toDS().map(_ + 1) + * + * testStream(mapped)( + * AddData(inputData, 1, 2, 3), + * CheckAnswer(2, 3, 4)) * }}} * * Note that while we do sleep to allow the other thread to progress without spinning, @@ -477,21 +477,41 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + + /** + * Creates a stress test that randomly starts/stops/adds data/checks the result. + * + * @param ds a dataframe that executes + 1 on a stream of integers, returning the result + * @param addData an add data action that adds the given numbers to the stream, encoding them + * as needed + * @param iterations the iteration number + */ + def runStressTest( + ds: Dataset[Int], + addData: Seq[Int] => StreamAction, + iterations: Int = 100): Unit = { + runStressTest(ds, Seq.empty, (data, running) => addData(data), iterations) + } + /** * Creates a stress test that randomly starts/stops/adds data/checks the result. * - * @param ds a dataframe that executes + 1 on a stream of integers, returning the result. - * @param addData and add data action that adds the given numbers to the stream, encoding them + * @param ds a dataframe that executes + 1 on a stream of integers, returning the result + * @param prepareActions actions need to run before starting the stress test. + * @param addData an add data action that adds the given numbers to the stream, encoding them * as needed + * @param iterations the iteration number */ def runStressTest( ds: Dataset[Int], - addData: Seq[Int] => StreamAction, - iterations: Int = 100): Unit = { + prepareActions: Seq[StreamAction], + addData: (Seq[Int], Boolean) => StreamAction, + iterations: Int): Unit = { implicit val intEncoder = ExpressionEncoder[Int]() var dataPos = 0 var running = true val actions = new ArrayBuffer[StreamAction]() + actions ++= prepareActions def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) } @@ -499,7 +519,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val numItems = Random.nextInt(10) val data = dataPos until (dataPos + numItems) dataPos += numItems - actions += addData(data) + actions += addData(data, running) } (1 to iterations).foreach { i => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org