Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22009#discussion_r208680325
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
---
@@ -162,23 +141,52 @@ case class KafkaContinuousInputPartition(
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
- failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow]
{
+ failOnDataLoss: Boolean) extends InputPartition
+object KafkaContinuousReaderFactory extends
ContinuousPartitionReaderFactory {
override def createContinuousReader(
- offset: PartitionOffset): InputPartitionReader[InternalRow] = {
- val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
- require(kafkaOffset.topicPartition == topicPartition,
- s"Expected topicPartition: $topicPartition, but got:
${kafkaOffset.topicPartition}")
- new KafkaContinuousInputPartitionReader(
- topicPartition, kafkaOffset.partitionOffset, kafkaParams,
pollTimeoutMs, failOnDataLoss)
+ partition: InputPartition): ContinuousPartitionReader[InternalRow] =
{
+ val p = partition.asInstanceOf[KafkaContinuousInputPartition]
+ new KafkaContinuousPartitionReader(
+ p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs,
p.failOnDataLoss)
}
+}
+
+class KafkaContinuousScanConfigBuilder(
+ schema: StructType,
+ startOffset: Offset,
+ offsetReader: KafkaOffsetReader,
+ reportDataLoss: String => Unit)
+ extends ScanConfigBuilder {
+
+ override def build(): ScanConfig = {
+ val oldStartPartitionOffsets =
KafkaSourceOffset.getPartitionOffsets(startOffset)
--- End diff --
Moved from
https://github.com/apache/spark/pull/22009/files#diff-b35752a92e5ab595a6360d6123c7b7b8L93
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]