Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r170279504
--- Diff:
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with
BeforeAndAfterAll {
private val preferredHosts = LocationStrategies.PreferConsistent
+ private def compactLogs(topic: String, partition: Int, messages:
Array[(String, String)]) {
+ val mockTime = new MockTime()
+ // LogCleaner in 0.10 version of Kafka is still expecting the old
TopicAndPartition api
+ val logs = new Pool[TopicAndPartition, Log]()
+ val logDir = kafkaTestUtils.brokerLogDir
+ val dir = new java.io.File(logDir, topic + "-" + partition)
+ dir.mkdirs()
+ val logProps = new ju.Properties()
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f:
java.lang.Float)
+ val log = new Log(
+ dir,
+ LogConfig(logProps),
+ 0L,
+ mockTime.scheduler,
+ mockTime
+ )
+ messages.foreach { case (k, v) =>
+ val msg = new ByteBufferMessageSet(
--- End diff --
Unindent one level?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]