Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22138#discussion_r214797928
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+ type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+ private val dummyBytes = "dummy".getBytes
+
+ test("acquire fresh one") {
+ val dataPool = FetchedDataPool.build
+
+ val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+ assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+ val data = dataPool.acquire(cacheKey, 0)
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(dataPool.getCache(cacheKey).head.inUse)
+
+ data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+ dataPool.release(cacheKey, data)
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(!dataPool.getCache(cacheKey).head.inUse)
+
+ dataPool.shutdown()
+ }
+
+ test("acquire fetched data from multiple keys") {
+ val dataPool = FetchedDataPool.build
+
+ val cacheKeys = (0 to 10).map { partId =>
+ CacheKey("testgroup", new TopicPartition("topic", partId))
+ }
+
+ assert(dataPool.getCache.size === 0)
+ cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+ val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+ assert(dataPool.getCache.size === cacheKeys.size)
+ cacheKeys.map { key =>
+ assert(dataPool.getCache(key).size === 1)
+ assert(dataPool.getCache(key).head.inUse)
+ }
+
+ dataList.map { case (_, data) =>
+ data.withNewPoll(testRecords(0, 5).listIterator, 5)
+ }
+
+ dataList.foreach { case (key, data) =>
+ dataPool.release(key, data)
+ }
+
+ assert(dataPool.getCache.size === cacheKeys.size)
+ cacheKeys.map { key =>
+ assert(dataPool.getCache(key).size === 1)
+ assert(!dataPool.getCache(key).head.inUse)
+ }
+
+ dataPool.shutdown()
+ }
+
+ test("continuous use of fetched data from single key") {
+ val dataPool = FetchedDataPool.build
+
+ val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+ assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+ val data = dataPool.acquire(cacheKey, 0)
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(dataPool.getCache(cacheKey).head.inUse)
+
+ data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+ (0 to 3).foreach { _ => data.next() }
+
+ dataPool.release(cacheKey, data)
+
+ // suppose next batch
+
+ val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+ assert(data.eq(data2))
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(dataPool.getCache(cacheKey).head.inUse)
+
+ dataPool.release(cacheKey, data2)
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(!dataPool.getCache(cacheKey).head.inUse)
+
+ dataPool.shutdown()
+ }
+
+ test("multiple tasks referring same key continuously using fetched
data") {
+ val dataPool = FetchedDataPool.build
+
+ val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+ assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+ val dataFromTask1 = dataPool.acquire(cacheKey, 0)
+
+ assert(dataPool.getCache(cacheKey).size === 1)
+ assert(dataPool.getCache(cacheKey).head.inUse)
+
+ val dataFromTask2 = dataPool.acquire(cacheKey, 0)
+
+ // it shouldn't give same object as dataFromTask1 though it asks same
offset
+ // it definitely works when offsets are not overlapped: skip adding
test for that
+ assert(dataPool.getCache(cacheKey).size === 2)
+ assert(dataPool.getCache(cacheKey)(1).inUse)
+
+ // reading from task 1
+ dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+ (0 to 3).foreach { _ => dataFromTask1.next() }
+
+ dataPool.release(cacheKey, dataFromTask1)
+
+ // reading from task 2
+ dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
+
+ (0 to 5).foreach { _ => dataFromTask2.next() }
+
+ dataPool.release(cacheKey, dataFromTask2)
+
+ // suppose next batch for task 1
+ val data2FromTask1 = dataPool.acquire(cacheKey,
dataFromTask1.nextOffsetInFetchedData)
+ assert(data2FromTask1.eq(dataFromTask1))
+
+ assert(dataPool.getCache(cacheKey).head.inUse)
+
+ // suppose next batch for task 2
+ val data2FromTask2 = dataPool.acquire(cacheKey,
dataFromTask2.nextOffsetInFetchedData)
+ assert(data2FromTask2.eq(dataFromTask2))
+
+ assert(dataPool.getCache(cacheKey)(1).inUse)
+
+ // release from task 2
+ dataPool.release(cacheKey, data2FromTask2)
+ assert(!dataPool.getCache(cacheKey)(1).inUse)
+
+ // release from task 1
+ dataPool.release(cacheKey, data2FromTask1)
+ assert(!dataPool.getCache(cacheKey).head.inUse)
+
+ dataPool.shutdown()
+ }
+
+ test("evict idle fetched data") {
--- End diff --
This stays in infinite loop if `startEvictorThread` throws exception.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]