Github user ijuma commented on a diff in the pull request:
https://github.com/apache/spark/pull/21955#discussion_r207318249
--- Diff:
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
---
@@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with
BeforeAndAfterAll {
private def compactLogs(topic: String, partition: Int, messages:
Array[(String, String)]) {
val mockTime = new MockTime()
- // LogCleaner in 0.10 version of Kafka is still expecting the old
TopicAndPartition api
- val logs = new Pool[TopicAndPartition, Log]()
+ val logs = new Pool[TopicPartition, Log]()
val logDir = kafkaTestUtils.brokerLogDir
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp,
java.lang.Float.valueOf(0.1f))
+ // TODO is this new Log declaration correct?
+ val logDirFailureChannel = new LogDirFailureChannel(0)
val log = new Log(
dir,
LogConfig(logProps),
0L,
+ 0L,
mockTime.scheduler,
- mockTime
+ new BrokerTopicStats(),
+ mockTime,
+ Int.MaxValue,
+ Int.MaxValue,
+ new TopicPartition(topic, partition),
+ new ProducerStateManager(new TopicPartition(topic, partition), dir),
+ logDirFailureChannel
)
messages.foreach { case (k, v) =>
- val msg = new ByteBufferMessageSet(
- NoCompressionCodec,
- new Message(v.getBytes, k.getBytes, Message.NoTimestamp,
Message.CurrentMagicValue))
- log.append(msg)
+ val records = new MemoryRecords()
--- End diff --
```java
public static MemoryRecords withRecords(CompressionType compressionType,
SimpleRecord... records) {
```
Maybe you can use the above?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]