Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/22547#discussion_r226783272
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -106,85 +107,96 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}
- /**
- * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to
read
- * batches of Kafka data in a micro-batch streaming query.
- */
- override def createMicroBatchReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaMicroBatchReadSupport = {
-
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the
query may be only assigned
- // partial data since Kafka will assign partitions to multiple
consumers having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId =
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
-
- val startingStreamOffsets =
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
- STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
-
- val kafkaOffsetReader = new KafkaOffsetReader(
- strategy(caseInsensitiveParams),
- kafkaParamsForDriver(specifiedKafkaParams),
- parameters,
- driverGroupIdPrefix = s"$uniqueGroupId-driver")
-
- new KafkaMicroBatchReadSupport(
- kafkaOffsetReader,
- kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
- options,
- metadataPath,
- startingStreamOffsets,
- failOnDataLoss(caseInsensitiveParams))
+ override def getTable(options: DataSourceOptions): KafkaTable.type = {
+ KafkaTable
}
- /**
- * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to
read
- * Kafka data in a continuous streaming query.
- */
- override def createContinuousReadSupport(
- metadataPath: String,
- options: DataSourceOptions): KafkaContinuousReadSupport = {
- val parameters = options.asMap().asScala.toMap
- validateStreamOptions(parameters)
- // Each running query should use its own group id. Otherwise, the
query may be only assigned
- // partial data since Kafka will assign partitions to multiple
consumers having the same group
- // id. Hence, we should generate a unique id for each query.
- val uniqueGroupId =
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
- val caseInsensitiveParams = parameters.map { case (k, v) =>
(k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
+ object KafkaTable extends Table
--- End diff --
Why is `KafkaTable` an object, not a class? This doesn't seem to fit an
abstraction.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]