Github user ijuma commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21955#discussion_r207318249
  
    --- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
    @@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     
       private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
         val mockTime = new MockTime()
    -    // LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
    -    val logs = new Pool[TopicAndPartition, Log]()
    +    val logs = new Pool[TopicPartition, Log]()
         val logDir = kafkaTestUtils.brokerLogDir
         val dir = new File(logDir, topic + "-" + partition)
         dir.mkdirs()
         val logProps = new ju.Properties()
         logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
         logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
    +    // TODO is this new Log declaration correct?
    +    val logDirFailureChannel = new LogDirFailureChannel(0)
         val log = new Log(
           dir,
           LogConfig(logProps),
           0L,
    +      0L,
           mockTime.scheduler,
    -      mockTime
    +      new BrokerTopicStats(),
    +      mockTime,
    +      Int.MaxValue,
    +      Int.MaxValue,
    +      new TopicPartition(topic, partition),
    +      new ProducerStateManager(new TopicPartition(topic, partition), dir),
    +      logDirFailureChannel
         )
         messages.foreach { case (k, v) =>
    -      val msg = new ByteBufferMessageSet(
    -        NoCompressionCodec,
    -        new Message(v.getBytes, k.getBytes, Message.NoTimestamp, 
Message.CurrentMagicValue))
    -      log.append(msg)
    +      val records = new MemoryRecords()
    --- End diff --
    
    ```java
    public static MemoryRecords withRecords(CompressionType compressionType, 
SimpleRecord... records) {
    ```
    Maybe you can use the above?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to