Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r208680325
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 ---
    @@ -162,23 +141,52 @@ case class KafkaContinuousInputPartition(
         startOffset: Long,
         kafkaParams: ju.Map[String, Object],
         pollTimeoutMs: Long,
    -    failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] 
{
    +    failOnDataLoss: Boolean) extends InputPartition
     
    +object KafkaContinuousReaderFactory extends 
ContinuousPartitionReaderFactory {
       override def createContinuousReader(
    -      offset: PartitionOffset): InputPartitionReader[InternalRow] = {
    -    val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
    -    require(kafkaOffset.topicPartition == topicPartition,
    -      s"Expected topicPartition: $topicPartition, but got: 
${kafkaOffset.topicPartition}")
    -    new KafkaContinuousInputPartitionReader(
    -      topicPartition, kafkaOffset.partitionOffset, kafkaParams, 
pollTimeoutMs, failOnDataLoss)
    +      partition: InputPartition): ContinuousPartitionReader[InternalRow] = 
{
    +    val p = partition.asInstanceOf[KafkaContinuousInputPartition]
    +    new KafkaContinuousPartitionReader(
    +      p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, 
p.failOnDataLoss)
       }
    +}
    +
    +class KafkaContinuousScanConfigBuilder(
    +    schema: StructType,
    +    startOffset: Offset,
    +    offsetReader: KafkaOffsetReader,
    +    reportDataLoss: String => Unit)
    +  extends ScanConfigBuilder {
    +
    +  override def build(): ScanConfig = {
    +    val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(startOffset)
    --- End diff --
    
    Moved from 
https://github.com/apache/spark/pull/22009/files#diff-b35752a92e5ab595a6360d6123c7b7b8L93


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to