[SPARK-24882][SQL] improve data source v2 API ## What changes were proposed in this pull request?
Improve the data source v2 API according to the [design doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing) summary of the changes 1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> `InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> `InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar renaming also happens at streaming and write APIs. 2. create `ScanConfig` to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming `ReadSupport`(previouslly named `DataSourceReader`) immutable. All other methods take `ScanConfig` as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.). 3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`. 4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`) Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ): 1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same as the previous `DataSourceReader`, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` take `DataSourceOptions` as parameter, if we decide to change the life cycle. 2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API more flexible. But it's only needed when we add the `replaceWhere` support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR. 3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas. 4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the `ScanConfig`. Better to be done in another PR. 5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the `SupportsPushdownXYZ` traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc. 6. Improve the continuous streaming engine to only create a new `ScanConfig` when re-configuring. 7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have for file source, we can change the hive partition pruning to use the public `Filter`. ## How was this patch tested? existing tests. Closes #22009 from cloud-fan/redesign. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7548871 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7548871 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7548871 Branch: refs/heads/master Commit: e754887182304ad0d622754e33192ebcdd515965 Parents: 55f3664 Author: Wenchen Fan <wenc...@databricks.com> Authored: Wed Aug 22 00:10:55 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Wed Aug 22 00:10:55 2018 -0700 ---------------------------------------------------------------------- .../kafka010/KafkaContinuousReadSupport.scala | 255 +++++++++++ .../sql/kafka010/KafkaContinuousReader.scala | 248 ----------- .../kafka010/KafkaMicroBatchReadSupport.scala | 401 +++++++++++++++++ .../sql/kafka010/KafkaMicroBatchReader.scala | 402 ----------------- .../sql/kafka010/KafkaSourceProvider.scala | 37 +- .../spark/sql/kafka010/KafkaStreamWriter.scala | 118 ----- .../kafka010/KafkaStreamingWriteSupport.scala | 118 +++++ .../kafka010/KafkaContinuousSourceSuite.scala | 8 +- .../sql/kafka010/KafkaContinuousTest.scala | 8 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 33 +- .../sources/v2/BatchReadSupportProvider.java | 61 +++ .../sources/v2/BatchWriteSupportProvider.java | 59 +++ .../sql/sources/v2/ContinuousReadSupport.java | 46 -- .../v2/ContinuousReadSupportProvider.java | 70 +++ .../spark/sql/sources/v2/DataSourceV2.java | 10 +- .../sql/sources/v2/MicroBatchReadSupport.java | 52 --- .../v2/MicroBatchReadSupportProvider.java | 70 +++ .../spark/sql/sources/v2/ReadSupport.java | 65 --- .../sql/sources/v2/SessionConfigSupport.java | 13 +- .../sql/sources/v2/StreamWriteSupport.java | 52 --- .../v2/StreamingWriteSupportProvider.java | 54 +++ .../spark/sql/sources/v2/WriteSupport.java | 53 --- .../sql/sources/v2/reader/BatchReadSupport.java | 51 +++ .../v2/reader/ContinuousInputPartition.java | 35 -- .../sql/sources/v2/reader/DataSourceReader.java | 75 ---- .../sql/sources/v2/reader/InputPartition.java | 26 +- .../sources/v2/reader/InputPartitionReader.java | 53 --- .../sql/sources/v2/reader/PartitionReader.java | 49 +++ .../v2/reader/PartitionReaderFactory.java | 66 +++ .../sql/sources/v2/reader/ReadSupport.java | 50 +++ .../spark/sql/sources/v2/reader/ScanConfig.java | 45 ++ .../sources/v2/reader/ScanConfigBuilder.java | 30 ++ .../spark/sql/sources/v2/reader/Statistics.java | 2 +- .../v2/reader/SupportsDeprecatedScanRow.java | 39 -- .../reader/SupportsPushDownCatalystFilters.java | 4 +- .../v2/reader/SupportsPushDownFilters.java | 6 +- .../reader/SupportsPushDownRequiredColumns.java | 8 +- .../v2/reader/SupportsReportPartitioning.java | 12 +- .../v2/reader/SupportsReportStatistics.java | 16 +- .../v2/reader/SupportsScanColumnarBatch.java | 53 --- .../partitioning/ClusteredDistribution.java | 4 +- .../v2/reader/partitioning/Distribution.java | 6 +- .../v2/reader/partitioning/Partitioning.java | 5 +- .../ContinuousInputPartitionReader.java | 36 -- .../streaming/ContinuousPartitionReader.java | 37 ++ .../ContinuousPartitionReaderFactory.java | 40 ++ .../reader/streaming/ContinuousReadSupport.java | 77 ++++ .../v2/reader/streaming/ContinuousReader.java | 79 ---- .../reader/streaming/MicroBatchReadSupport.java | 60 +++ .../v2/reader/streaming/MicroBatchReader.java | 75 ---- .../sql/sources/v2/reader/streaming/Offset.java | 4 +- .../reader/streaming/StreamingReadSupport.java | 49 +++ .../streaming/SupportsCustomReaderMetrics.java | 10 +- .../sources/v2/writer/BatchWriteSupport.java | 101 +++++ .../sql/sources/v2/writer/DataSourceWriter.java | 116 ----- .../spark/sql/sources/v2/writer/DataWriter.java | 16 +- .../sources/v2/writer/DataWriterFactory.java | 23 +- .../sources/v2/writer/WriterCommitMessage.java | 9 +- .../v2/writer/streaming/StreamWriter.java | 71 --- .../streaming/StreamingDataWriterFactory.java | 59 +++ .../writer/streaming/StreamingWriteSupport.java | 71 +++ .../streaming/SupportsCustomWriterMetrics.java | 10 +- .../org/apache/spark/sql/DataFrameReader.scala | 4 +- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +- .../datasources/v2/DataSourceRDD.scala | 44 +- .../datasources/v2/DataSourceV2Relation.scala | 72 +-- .../datasources/v2/DataSourceV2ScanExec.scala | 65 ++- .../datasources/v2/DataSourceV2Strategy.scala | 49 ++- .../datasources/v2/DataSourceV2Utils.scala | 9 + .../v2/WriteToDataSourceV2Exec.scala | 40 +- .../streaming/MicroBatchExecution.scala | 91 ++-- .../execution/streaming/ProgressReporter.scala | 22 +- .../SimpleStreamingScanConfigBuilder.scala | 40 ++ .../execution/streaming/StreamingRelation.scala | 6 +- .../spark/sql/execution/streaming/console.scala | 14 +- .../continuous/ContinuousDataSourceRDD.scala | 37 +- .../continuous/ContinuousExecution.scala | 51 ++- .../continuous/ContinuousQueuedDataReader.scala | 29 +- .../continuous/ContinuousRateStreamSource.scala | 60 ++- .../continuous/ContinuousTextSocketSource.scala | 74 ++-- .../continuous/ContinuousWriteRDD.scala | 7 +- .../streaming/continuous/EpochCoordinator.scala | 18 +- .../WriteToContinuousDataSource.scala | 4 +- .../WriteToContinuousDataSourceExec.scala | 10 +- .../spark/sql/execution/streaming/memory.scala | 51 +-- .../streaming/sources/ConsoleWriteSupport.scala | 71 +++ .../streaming/sources/ConsoleWriter.scala | 72 --- .../sources/ContinuousMemoryStream.scala | 76 ++-- .../sources/ForeachWriteSupportProvider.scala | 140 ++++++ .../sources/ForeachWriterProvider.scala | 139 ------ .../sources/MicroBatchWritSupport.scala | 51 +++ .../streaming/sources/MicroBatchWriter.scala | 37 -- .../sources/PackedRowWriterFactory.scala | 9 +- .../RateControlMicroBatchReadSupport.scala | 31 ++ .../RateStreamMicroBatchReadSupport.scala | 215 +++++++++ .../sources/RateStreamMicroBatchReader.scala | 220 ---------- .../streaming/sources/RateStreamProvider.scala | 27 +- .../execution/streaming/sources/memoryV2.scala | 62 +-- .../execution/streaming/sources/socket.scala | 114 +++-- .../spark/sql/streaming/DataStreamReader.scala | 52 ++- .../spark/sql/streaming/DataStreamWriter.scala | 9 +- .../sql/streaming/StreamingQueryManager.scala | 4 +- .../sources/v2/JavaAdvancedDataSourceV2.java | 147 ++++--- .../sql/sources/v2/JavaBatchDataSourceV2.java | 114 ----- .../sources/v2/JavaColumnarDataSourceV2.java | 114 +++++ .../v2/JavaPartitionAwareDataSource.java | 81 ++-- .../v2/JavaSchemaRequiredDataSource.java | 26 +- .../sql/sources/v2/JavaSimpleDataSourceV2.java | 68 +-- .../sql/sources/v2/JavaSimpleReadSupport.java | 99 +++++ ....apache.spark.sql.sources.DataSourceRegister | 4 +- .../execution/streaming/MemorySinkV2Suite.scala | 44 +- .../sources/ConsoleWriteSupportSuite.scala | 151 +++++++ .../streaming/sources/ConsoleWriterSuite.scala | 153 ------- .../sources/RateStreamProviderSuite.scala | 84 ++-- .../sources/TextSocketStreamSuite.scala | 81 ++-- .../sql/sources/v2/DataSourceV2Suite.scala | 435 ++++++++++--------- .../sources/v2/SimpleWritableDataSource.scala | 110 ++--- .../apache/spark/sql/streaming/StreamTest.scala | 2 +- .../streaming/StreamingQueryListenerSuite.scala | 4 +- .../sql/streaming/StreamingQuerySuite.scala | 59 +-- .../ContinuousQueuedDataReaderSuite.scala | 45 +- .../streaming/continuous/ContinuousSuite.scala | 2 +- .../continuous/EpochCoordinatorSuite.scala | 18 +- .../sources/StreamingDataSourceV2Suite.scala | 95 ++-- 124 files changed, 4123 insertions(+), 3758 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala new file mode 100644 index 0000000..4a18839 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeoutException + +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming._ +import org.apache.spark.sql.types.StructType + +/** + * A [[ContinuousReadSupport]] for data from kafka. + * + * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be + * read by per-task consumers generated later. + * @param kafkaParams String params for per-task Kafka consumers. + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which + * are not Kafka consumer params. + * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param initialOffsets The Kafka offsets to start reading data at. + * @param failOnDataLoss Flag indicating whether reading should fail in data loss + * scenarios, where some offsets after the specified initial ones can't be + * properly read. + */ +class KafkaContinuousReadSupport( + offsetReader: KafkaOffsetReader, + kafkaParams: ju.Map[String, Object], + sourceOptions: Map[String, String], + metadataPath: String, + initialOffsets: KafkaOffsetRangeLimit, + failOnDataLoss: Boolean) + extends ContinuousReadSupport with Logging { + + private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + + override def initialOffset(): Offset = { + val offsets = initialOffsets match { + case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) + case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + logInfo(s"Initial offsets: $offsets") + offsets + } + + override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema + + override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = { + new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss) + } + + override def deserializeOffset(json: String): Offset = { + KafkaSourceOffset(JsonUtils.partitionOffsets(json)) + } + + override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { + val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets + startOffsets.toSeq.map { + case (topicPartition, start) => + KafkaContinuousInputPartition( + topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss) + }.toArray + } + + override def createContinuousReaderFactory( + config: ScanConfig): ContinuousPartitionReaderFactory = { + KafkaContinuousReaderFactory + } + + /** Stop this source and free any resources it has allocated. */ + def stop(): Unit = synchronized { + offsetReader.close() + } + + override def commit(end: Offset): Unit = {} + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { + val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) + }.reduce(_ ++ _) + KafkaSourceOffset(mergedMap) + } + + override def needsReconfiguration(config: ScanConfig): Boolean = { + val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions + offsetReader.fetchLatestOffsets().keySet != knownPartitions + } + + override def toString(): String = s"KafkaSource[$offsetReader]" + + /** + * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * Otherwise, just log a warning. + */ + private def reportDataLoss(message: String): Unit = { + if (failOnDataLoss) { + throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + } else { + logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + } + } +} + +/** + * An input partition for continuous Kafka processing. This will be serialized and transformed + * into a full reader on executors. + * + * @param topicPartition The (topic, partition) pair this task is responsible for. + * @param startOffset The offset to start reading from within the partition. + * @param kafkaParams Kafka consumer params to use. + * @param pollTimeoutMs The timeout for Kafka consumer polling. + * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets + * are skipped. + */ +case class KafkaContinuousInputPartition( + topicPartition: TopicPartition, + startOffset: Long, + kafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean) extends InputPartition + +object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory { + override def createReader(partition: InputPartition): ContinuousPartitionReader[InternalRow] = { + val p = partition.asInstanceOf[KafkaContinuousInputPartition] + new KafkaContinuousPartitionReader( + p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, p.failOnDataLoss) + } +} + +class KafkaContinuousScanConfigBuilder( + schema: StructType, + startOffset: Offset, + offsetReader: KafkaOffsetReader, + reportDataLoss: String => Unit) + extends ScanConfigBuilder { + + override def build(): ScanConfig = { + val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset) + + val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet + val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) + val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) + + val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"Some partitions were deleted: $deletedPartitions") + } + + val startOffsets = newPartitionOffsets ++ + oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) + KafkaContinuousScanConfig(schema, startOffsets) + } +} + +case class KafkaContinuousScanConfig( + readSchema: StructType, + startOffsets: Map[TopicPartition, Long]) + extends ScanConfig { + + // Created when building the scan config builder. If this diverges from the partitions at the + // latest offsets, we need to reconfigure the kafka read support. + def knownPartitions: Set[TopicPartition] = startOffsets.keySet +} + +/** + * A per-task data reader for continuous Kafka processing. + * + * @param topicPartition The (topic, partition) pair this data reader is responsible for. + * @param startOffset The offset to start reading from within the partition. + * @param kafkaParams Kafka consumer params to use. + * @param pollTimeoutMs The timeout for Kafka consumer polling. + * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets + * are skipped. + */ +class KafkaContinuousPartitionReader( + topicPartition: TopicPartition, + startOffset: Long, + kafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] { + private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false) + private val converter = new KafkaRecordToUnsafeRowConverter + + private var nextKafkaOffset = startOffset + private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ + + override def next(): Boolean = { + var r: ConsumerRecord[Array[Byte], Array[Byte]] = null + while (r == null) { + if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false + // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving + // interrupt points to end the query rather than waiting for new data that might never come. + try { + r = consumer.get( + nextKafkaOffset, + untilOffset = Long.MaxValue, + pollTimeoutMs, + failOnDataLoss) + } catch { + // We didn't read within the timeout. We're supposed to block indefinitely for new data, so + // swallow and ignore this. + case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => + + // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, + // or if it's the endpoint of the data range (i.e. the "true" next offset). + case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => + val range = consumer.getAvailableOffsetRange() + if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { + // retry + } else { + throw e + } + } + } + nextKafkaOffset = r.offset + 1 + currentRecord = r + true + } + + override def get(): UnsafeRow = { + converter.toUnsafeRow(currentRecord) + } + + override def getOffset(): KafkaSourcePartitionOffset = { + KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) + } + + override def close(): Unit = { + consumer.release() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala deleted file mode 100644 index be7ce3b..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.{util => ju} -import java.util.concurrent.TimeoutException - -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException} -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.TaskContext -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset} -import org.apache.spark.sql.types.StructType - -/** - * A [[ContinuousReader]] for data from kafka. - * - * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be - * read by per-task consumers generated later. - * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which - * are not Kafka consumer params. - * @param metadataPath Path to a directory this reader can use for writing metadata. - * @param initialOffsets The Kafka offsets to start reading data at. - * @param failOnDataLoss Flag indicating whether reading should fail in data loss - * scenarios, where some offsets after the specified initial ones can't be - * properly read. - */ -class KafkaContinuousReader( - offsetReader: KafkaOffsetReader, - kafkaParams: ju.Map[String, Object], - sourceOptions: Map[String, String], - metadataPath: String, - initialOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) - extends ContinuousReader with Logging { - - private lazy val session = SparkSession.getActiveSession.get - private lazy val sc = session.sparkContext - - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong - - // Initialized when creating reader factories. If this diverges from the partitions at the latest - // offsets, we need to reconfigure. - // Exposed outside this object only for unit tests. - @volatile private[sql] var knownPartitions: Set[TopicPartition] = _ - - override def readSchema: StructType = KafkaOffsetReader.kafkaSchema - - private var offset: Offset = _ - override def setStartOffset(start: ju.Optional[Offset]): Unit = { - offset = start.orElse { - val offsets = initialOffsets match { - case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) - } - logInfo(s"Initial offsets: $offsets") - offsets - } - } - - override def getStartOffset(): Offset = offset - - override def deserializeOffset(json: String): Offset = { - KafkaSourceOffset(JsonUtils.partitionOffsets(json)) - } - - override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = { - import scala.collection.JavaConverters._ - - val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset) - - val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet - val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) - val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) - - val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) - if (deletedPartitions.nonEmpty) { - reportDataLoss(s"Some partitions were deleted: $deletedPartitions") - } - - val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) - knownPartitions = startOffsets.keySet - - startOffsets.toSeq.map { - case (topicPartition, start) => - KafkaContinuousInputPartition( - topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss - ): InputPartition[InternalRow] - }.asJava - } - - /** Stop this source and free any resources it has allocated. */ - def stop(): Unit = synchronized { - offsetReader.close() - } - - override def commit(end: Offset): Unit = {} - - override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { - val mergedMap = offsets.map { - case KafkaSourcePartitionOffset(p, o) => Map(p -> o) - }.reduce(_ ++ _) - KafkaSourceOffset(mergedMap) - } - - override def needsReconfiguration(): Boolean = { - knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions - } - - override def toString(): String = s"KafkaSource[$offsetReader]" - - /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. - * Otherwise, just log a warning. - */ - private def reportDataLoss(message: String): Unit = { - if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") - } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") - } - } -} - -/** - * An input partition for continuous Kafka processing. This will be serialized and transformed - * into a full reader on executors. - * - * @param topicPartition The (topic, partition) pair this task is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param pollTimeoutMs The timeout for Kafka consumer polling. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -case class KafkaContinuousInputPartition( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] { - - override def createContinuousReader( - offset: PartitionOffset): InputPartitionReader[InternalRow] = { - val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] - require(kafkaOffset.topicPartition == topicPartition, - s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}") - new KafkaContinuousInputPartitionReader( - topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) - } - - override def createPartitionReader(): KafkaContinuousInputPartitionReader = { - new KafkaContinuousInputPartitionReader( - topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) - } -} - -/** - * A per-task data reader for continuous Kafka processing. - * - * @param topicPartition The (topic, partition) pair this data reader is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param pollTimeoutMs The timeout for Kafka consumer polling. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -class KafkaContinuousInputPartitionReader( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[InternalRow] { - private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false) - private val converter = new KafkaRecordToUnsafeRowConverter - - private var nextKafkaOffset = startOffset - private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ - - override def next(): Boolean = { - var r: ConsumerRecord[Array[Byte], Array[Byte]] = null - while (r == null) { - if (TaskContext.get().isInterrupted() || TaskContext.get().isCompleted()) return false - // Our consumer.get is not interruptible, so we have to set a low poll timeout, leaving - // interrupt points to end the query rather than waiting for new data that might never come. - try { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs, - failOnDataLoss) - } catch { - // We didn't read within the timeout. We're supposed to block indefinitely for new data, so - // swallow and ignore this. - case _: TimeoutException | _: org.apache.kafka.common.errors.TimeoutException => - - // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, - // or if it's the endpoint of the data range (i.e. the "true" next offset). - case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => - val range = consumer.getAvailableOffsetRange() - if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { - // retry - } else { - throw e - } - } - } - nextKafkaOffset = r.offset + 1 - currentRecord = r - true - } - - override def get(): UnsafeRow = { - converter.toUnsafeRow(currentRecord) - } - - override def getOffset(): KafkaSourcePartitionOffset = { - KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) - } - - override def close(): Unit = { - consumer.release() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala new file mode 100644 index 0000000..c31af60 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.io._ +import java.nio.charset.StandardCharsets + +import org.apache.commons.io.IOUtils +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder} +import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.UninterruptibleThread + +/** + * A [[MicroBatchReadSupport]] that reads data from Kafka. + * + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains + * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For + * example if the last record in a Kafka topic "t", partition 2 is offset 5, then + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent + * with the semantics of `KafkaConsumer.position()`. + * + * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user + * must make sure all messages in a topic have been processed when deleting a topic. + * + * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. + * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers + * and not use wrong broker addresses. + */ +private[kafka010] class KafkaMicroBatchReadSupport( + kafkaOffsetReader: KafkaOffsetReader, + executorKafkaParams: ju.Map[String, Object], + options: DataSourceOptions, + metadataPath: String, + startingOffsets: KafkaOffsetRangeLimit, + failOnDataLoss: Boolean) + extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics with Logging { + + private val pollTimeoutMs = options.getLong( + "kafkaConsumer.pollTimeoutMs", + SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + + private val maxOffsetsPerTrigger = + Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + + private val rangeCalculator = KafkaOffsetRangeCalculator(options) + + private var endPartitionOffsets: KafkaSourceOffset = _ + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + override def initialOffset(): Offset = { + KafkaSourceOffset(getOrCreateInitialPartitionOffsets()) + } + + override def latestOffset(start: Offset): Offset = { + val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets + val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() + endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => + rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) + }.getOrElse { + latestPartitionOffsets + }) + endPartitionOffsets + } + + override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema + + override def newScanConfigBuilder(start: Offset, end: Offset): ScanConfigBuilder = { + new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end)) + } + + override def planInputPartitions(config: ScanConfig): Array[InputPartition] = { + val sc = config.asInstanceOf[SimpleStreamingScanConfig] + val startPartitionOffsets = sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets + val endPartitionOffsets = sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets + + // Find the new partitions, and get their earliest offsets + val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) + val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) + if (newPartitionInitialOffsets.keySet != newPartitions) { + // We cannot get from offsets for some partitions. It means they got deleted. + val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) + reportDataLoss( + s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") + } + logInfo(s"Partitions added: $newPartitionInitialOffsets") + newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => + reportDataLoss( + s"Added partition $p starts from $o instead of 0. Some data may have been missed") + } + + // Find deleted partitions, and report data loss if required + val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) + if (deletedPartitions.nonEmpty) { + reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") + } + + // Use the end partitions to calculate offset ranges to ignore partitions that have + // been deleted + val topicPartitions = endPartitionOffsets.keySet.filter { tp => + // Ignore partitions that we don't know the from offsets. + newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) + }.toSeq + logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) + + // Calculate offset ranges + val offsetRanges = rangeCalculator.getRanges( + fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, + untilOffsets = endPartitionOffsets, + executorLocations = getSortedExecutorList()) + + // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, + // that is, concurrent tasks will not read the same TopicPartitions. + val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size + + // Generate factories based on the offset ranges + offsetRanges.map { range => + KafkaMicroBatchInputPartition( + range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) + }.toArray + } + + override def createReaderFactory(config: ScanConfig): PartitionReaderFactory = { + KafkaMicroBatchReaderFactory + } + + // TODO: figure out the life cycle of custom metrics, and make this method take `ScanConfig` as + // a parameter. + override def getCustomMetrics(): CustomMetrics = { + KafkaCustomMetrics( + kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets.partitionToOffsets) + } + + override def deserializeOffset(json: String): Offset = { + KafkaSourceOffset(JsonUtils.partitionOffsets(json)) + } + + override def commit(end: Offset): Unit = {} + + override def stop(): Unit = { + kafkaOffsetReader.close() + } + + override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" + + /** + * Read initial partition offsets from the checkpoint, or decide the offsets and write them to + * the checkpoint. + */ + private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = { + // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. + // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever + // (KAFKA-1894). + assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) + + // SparkSession is required for getting Hadoop configuration for writing to checkpoints + assert(SparkSession.getActiveSession.nonEmpty) + + val metadataLog = + new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) + metadataLog.get(0).getOrElse { + val offsets = startingOffsets match { + case EarliestOffsetRangeLimit => + KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) + case LatestOffsetRangeLimit => + KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) + case SpecificOffsetRangeLimit(p) => + kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + metadataLog.add(0, offsets) + logInfo(s"Initial offsets: $offsets") + offsets + }.partitionToOffsets + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: PartitionOffsetMap, + until: PartitionOffsetMap): PartitionOffsetMap = { + val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) + val sizes = until.flatMap { + case (tp, end) => + // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it + from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => + val size = end - begin + logDebug(s"rateLimit $tp size is $size") + if (size > 0) Some(tp -> size) else None + } + } + val total = sizes.values.sum.toDouble + if (total < 1) { + until + } else { + until.map { + case (tp, end) => + tp -> sizes.get(tp).map { size => + val begin = from.get(tp).getOrElse(fromNew(tp)) + val prorate = limit * (size / total) + // Don't completely starve small topicpartitions + val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong + // Paranoia, make sure not to return an offset that's past end + Math.min(end, off) + }.getOrElse(end) + } + } + } + + private def getSortedExecutorList(): Array[String] = { + + def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { + if (a.host == b.host) { + a.executorId > b.executorId + } else { + a.host > b.host + } + } + + val bm = SparkEnv.get.blockManager + bm.master.getPeers(bm.blockManagerId).toArray + .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) + .sortWith(compare) + .map(_.toString) + } + + /** + * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * Otherwise, just log a warning. + */ + private def reportDataLoss(message: String): Unit = { + if (failOnDataLoss) { + throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + } else { + logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") + } + } + + /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */ + class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String) + extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) { + + val VERSION = 1 + + override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { + out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) + val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + writer.write("v" + VERSION + "\n") + writer.write(metadata.json) + writer.flush + } + + override def deserialize(in: InputStream): KafkaSourceOffset = { + in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517) + val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) + // HDFSMetadataLog guarantees that it never creates a partial file. + assert(content.length != 0) + if (content(0) == 'v') { + val indexOfNewLine = content.indexOf("\n") + if (indexOfNewLine > 0) { + val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) + KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) + } else { + throw new IllegalStateException( + s"Log file was malformed: failed to detect the log file version line.") + } + } else { + // The log was generated by Spark 2.1.0 + KafkaSourceOffset(SerializedOffset(content)) + } + } + } +} + +/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */ +private[kafka010] case class KafkaMicroBatchInputPartition( + offsetRange: KafkaOffsetRange, + executorKafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends InputPartition + +private[kafka010] object KafkaMicroBatchReaderFactory extends PartitionReaderFactory { + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val p = partition.asInstanceOf[KafkaMicroBatchInputPartition] + KafkaMicroBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, + p.failOnDataLoss, p.reuseKafkaConsumer) + } +} + +/** A [[PartitionReader]] for reading Kafka data in a micro-batch streaming query. */ +private[kafka010] case class KafkaMicroBatchPartitionReader( + offsetRange: KafkaOffsetRange, + executorKafkaParams: ju.Map[String, Object], + pollTimeoutMs: Long, + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with Logging { + + private val consumer = KafkaDataConsumer.acquire( + offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) + + private val rangeToRead = resolveRange(offsetRange) + private val converter = new KafkaRecordToUnsafeRowConverter + + private var nextOffset = rangeToRead.fromOffset + private var nextRow: UnsafeRow = _ + + override def next(): Boolean = { + if (nextOffset < rangeToRead.untilOffset) { + val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) + if (record != null) { + nextRow = converter.toUnsafeRow(record) + true + } else { + false + } + } else { + false + } + } + + override def get(): UnsafeRow = { + assert(nextRow != null) + nextOffset += 1 + nextRow + } + + override def close(): Unit = { + consumer.release() + } + + private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { + if (range.fromOffset < 0 || range.untilOffset < 0) { + // Late bind the offset range + val availableOffsetRange = consumer.getAvailableOffsetRange() + val fromOffset = if (range.fromOffset < 0) { + assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST, + s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}") + availableOffsetRange.earliest + } else { + range.fromOffset + } + val untilOffset = if (range.untilOffset < 0) { + assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST, + s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}") + availableOffsetRange.latest + } else { + range.untilOffset + } + KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) + } else { + range + } + } +} + +/** + * Currently reports per topic-partition lag. + * This is the difference between the offset of the latest available data + * in a topic-partition and the latest offset that has been processed. + */ +private[kafka010] case class KafkaCustomMetrics( + latestOffsets: Map[TopicPartition, Long], + processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics { + override def json(): String = { + JsonUtils.partitionLags(latestOffsets, processedOffsets) + } + + override def toString: String = json() +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala deleted file mode 100644 index 900c9f4..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ /dev/null @@ -1,402 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.{util => ju} -import java.io._ -import java.nio.charset.StandardCharsets - -import scala.collection.JavaConverters._ - -import org.apache.commons.io.IOUtils -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} -import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions} -import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader} -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset, SupportsCustomReaderMetrics} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.UninterruptibleThread - -/** - * A [[MicroBatchReader]] that reads data from Kafka. - * - * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains - * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For - * example if the last record in a Kafka topic "t", partition 2 is offset 5, then - * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent - * with the semantics of `KafkaConsumer.position()`. - * - * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user - * must make sure all messages in a topic have been processed when deleting a topic. - * - * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. - * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers - * and not use wrong broker addresses. - */ -private[kafka010] class KafkaMicroBatchReader( - kafkaOffsetReader: KafkaOffsetReader, - executorKafkaParams: ju.Map[String, Object], - options: DataSourceOptions, - metadataPath: String, - startingOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) - extends MicroBatchReader with SupportsCustomReaderMetrics with Logging { - - private var startPartitionOffsets: PartitionOffsetMap = _ - private var endPartitionOffsets: PartitionOffsetMap = _ - - private val pollTimeoutMs = options.getLong( - "kafkaConsumer.pollTimeoutMs", - SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) - - private val maxOffsetsPerTrigger = - Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) - - private val rangeCalculator = KafkaOffsetRangeCalculator(options) - /** - * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only - * called in StreamExecutionThread. Otherwise, interrupting a thread while running - * `KafkaConsumer.poll` may hang forever (KAFKA-1894). - */ - private lazy val initialPartitionOffsets = getOrCreateInitialPartitionOffsets() - - override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = { - // Make sure initialPartitionOffsets is initialized - initialPartitionOffsets - - startPartitionOffsets = Option(start.orElse(null)) - .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) - .getOrElse(initialPartitionOffsets) - - endPartitionOffsets = Option(end.orElse(null)) - .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) - .getOrElse { - val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() - maxOffsetsPerTrigger.map { maxOffsets => - rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) - }.getOrElse { - latestPartitionOffsets - } - } - } - - override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = { - // Find the new partitions, and get their earliest offsets - val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) - val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) - if (newPartitionInitialOffsets.keySet != newPartitions) { - // We cannot get from offsets for some partitions. It means they got deleted. - val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) - reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") - } - logInfo(s"Partitions added: $newPartitionInitialOffsets") - newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => - reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed") - } - - // Find deleted partitions, and report data loss if required - val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet) - if (deletedPartitions.nonEmpty) { - reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") - } - - // Use the end partitions to calculate offset ranges to ignore partitions that have - // been deleted - val topicPartitions = endPartitionOffsets.keySet.filter { tp => - // Ignore partitions that we don't know the from offsets. - newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) - }.toSeq - logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) - - // Calculate offset ranges - val offsetRanges = rangeCalculator.getRanges( - fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, - untilOffsets = endPartitionOffsets, - executorLocations = getSortedExecutorList()) - - // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, - // that is, concurrent tasks will not read the same TopicPartitions. - val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size - - // Generate factories based on the offset ranges - offsetRanges.map { range => - new KafkaMicroBatchInputPartition( - range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer - ): InputPartition[InternalRow] - }.asJava - } - - override def getStartOffset: Offset = { - KafkaSourceOffset(startPartitionOffsets) - } - - override def getEndOffset: Offset = { - KafkaSourceOffset(endPartitionOffsets) - } - - override def getCustomMetrics: CustomMetrics = { - KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), endPartitionOffsets) - } - - override def deserializeOffset(json: String): Offset = { - KafkaSourceOffset(JsonUtils.partitionOffsets(json)) - } - - override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema - - override def commit(end: Offset): Unit = {} - - override def stop(): Unit = { - kafkaOffsetReader.close() - } - - override def toString(): String = s"KafkaV2[$kafkaOffsetReader]" - - /** - * Read initial partition offsets from the checkpoint, or decide the offsets and write them to - * the checkpoint. - */ - private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = { - // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. - // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever - // (KAFKA-1894). - assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) - - // SparkSession is required for getting Hadoop configuration for writing to checkpoints - assert(SparkSession.getActiveSession.nonEmpty) - - val metadataLog = - new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) - metadataLog.get(0).getOrElse { - val offsets = startingOffsets match { - case EarliestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => - kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) - } - metadataLog.add(0, offsets) - logInfo(s"Initial offsets: $offsets") - offsets - }.partitionToOffsets - } - - /** Proportionally distribute limit number of offsets among topicpartitions */ - private def rateLimit( - limit: Long, - from: PartitionOffsetMap, - until: PartitionOffsetMap): PartitionOffsetMap = { - val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq) - val sizes = until.flatMap { - case (tp, end) => - // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it - from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => - val size = end - begin - logDebug(s"rateLimit $tp size is $size") - if (size > 0) Some(tp -> size) else None - } - } - val total = sizes.values.sum.toDouble - if (total < 1) { - until - } else { - until.map { - case (tp, end) => - tp -> sizes.get(tp).map { size => - val begin = from.get(tp).getOrElse(fromNew(tp)) - val prorate = limit * (size / total) - // Don't completely starve small topicpartitions - val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong - // Paranoia, make sure not to return an offset that's past end - Math.min(end, off) - }.getOrElse(end) - } - } - } - - private def getSortedExecutorList(): Array[String] = { - - def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = { - if (a.host == b.host) { - a.executorId > b.executorId - } else { - a.host > b.host - } - } - - val bm = SparkEnv.get.blockManager - bm.master.getPeers(bm.blockManagerId).toArray - .map(x => ExecutorCacheTaskLocation(x.host, x.executorId)) - .sortWith(compare) - .map(_.toString) - } - - /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. - * Otherwise, just log a warning. - */ - private def reportDataLoss(message: String): Unit = { - if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") - } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") - } - } - - /** A version of [[HDFSMetadataLog]] specialized for saving the initial offsets. */ - class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath: String) - extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) { - - val VERSION = 1 - - override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) - val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write("v" + VERSION + "\n") - writer.write(metadata.json) - writer.flush - } - - override def deserialize(in: InputStream): KafkaSourceOffset = { - in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517) - val content = IOUtils.toString(new InputStreamReader(in, StandardCharsets.UTF_8)) - // HDFSMetadataLog guarantees that it never creates a partial file. - assert(content.length != 0) - if (content(0) == 'v') { - val indexOfNewLine = content.indexOf("\n") - if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) - KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) - } else { - throw new IllegalStateException( - s"Log file was malformed: failed to detect the log file version line.") - } - } else { - // The log was generated by Spark 2.1.0 - KafkaSourceOffset(SerializedOffset(content)) - } - } - } -} - -/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] case class KafkaMicroBatchInputPartition( - offsetRange: KafkaOffsetRange, - executorKafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean, - reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] { - - override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray - - override def createPartitionReader(): InputPartitionReader[InternalRow] = - new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs, - failOnDataLoss, reuseKafkaConsumer) -} - -/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] case class KafkaMicroBatchInputPartitionReader( - offsetRange: KafkaOffsetRange, - executorKafkaParams: ju.Map[String, Object], - pollTimeoutMs: Long, - failOnDataLoss: Boolean, - reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] with Logging { - - private val consumer = KafkaDataConsumer.acquire( - offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) - - private val rangeToRead = resolveRange(offsetRange) - private val converter = new KafkaRecordToUnsafeRowConverter - - private var nextOffset = rangeToRead.fromOffset - private var nextRow: UnsafeRow = _ - - override def next(): Boolean = { - if (nextOffset < rangeToRead.untilOffset) { - val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) - if (record != null) { - nextRow = converter.toUnsafeRow(record) - true - } else { - false - } - } else { - false - } - } - - override def get(): UnsafeRow = { - assert(nextRow != null) - nextOffset += 1 - nextRow - } - - override def close(): Unit = { - consumer.release() - } - - private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { - if (range.fromOffset < 0 || range.untilOffset < 0) { - // Late bind the offset range - val availableOffsetRange = consumer.getAvailableOffsetRange() - val fromOffset = if (range.fromOffset < 0) { - assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST, - s"earliest offset ${range.fromOffset} does not equal ${KafkaOffsetRangeLimit.EARLIEST}") - availableOffsetRange.earliest - } else { - range.fromOffset - } - val untilOffset = if (range.untilOffset < 0) { - assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST, - s"latest offset ${range.untilOffset} does not equal ${KafkaOffsetRangeLimit.LATEST}") - availableOffsetRange.latest - } else { - range.untilOffset - } - KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) - } else { - range - } - } -} - -/** - * Currently reports per topic-partition lag. - * This is the difference between the offset of the latest available data - * in a topic-partition and the latest offset that has been processed. - */ -private[kafka010] case class KafkaCustomMetrics( - latestOffsets: Map[TopicPartition, Long], - processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics { - override def json(): String = { - JsonUtils.partitionLags(latestOffsets, processedOffsets) - } - - override def toString: String = json() -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index d225c1e..28c9853 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,9 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} -import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -46,9 +45,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSinkProvider with RelationProvider with CreatableRelationProvider - with StreamWriteSupport - with ContinuousReadSupport - with MicroBatchReadSupport + with StreamingWriteSupportProvider + with ContinuousReadSupportProvider + with MicroBatchReadSupportProvider with Logging { import KafkaSourceProvider._ @@ -108,13 +107,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } /** - * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches - * of Kafka data in a micro-batch streaming query. + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to read + * batches of Kafka data in a micro-batch streaming query. */ - override def createMicroBatchReader( - schema: Optional[StructType], + override def createMicroBatchReadSupport( metadataPath: String, - options: DataSourceOptions): KafkaMicroBatchReader = { + options: DataSourceOptions): KafkaMicroBatchReadSupport = { val parameters = options.asMap().asScala.toMap validateStreamOptions(parameters) @@ -140,7 +138,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters, driverGroupIdPrefix = s"$uniqueGroupId-driver") - new KafkaMicroBatchReader( + new KafkaMicroBatchReadSupport( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), options, @@ -150,13 +148,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } /** - * Creates a [[ContinuousInputPartitionReader]] to read + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read * Kafka data in a continuous streaming query. */ - override def createContinuousReader( - schema: Optional[StructType], + override def createContinuousReadSupport( metadataPath: String, - options: DataSourceOptions): KafkaContinuousReader = { + options: DataSourceOptions): KafkaContinuousReadSupport = { val parameters = options.asMap().asScala.toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned @@ -181,7 +178,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters, driverGroupIdPrefix = s"$uniqueGroupId-driver") - new KafkaContinuousReader( + new KafkaContinuousReadSupport( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), parameters, @@ -270,11 +267,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } } - override def createStreamWriter( + override def createStreamingWriteSupport( queryId: String, schema: StructType, mode: OutputMode, - options: DataSourceOptions): StreamWriter = { + options: DataSourceOptions): StreamingWriteSupport = { import scala.collection.JavaConverters._ val spark = SparkSession.getActiveSession.get @@ -285,7 +282,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister KafkaWriter.validateQuery( schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) - new KafkaStreamWriter(topic, producerParams, schema) + new KafkaStreamingWriteSupport(topic, producerParams, schema) } private def strategy(caseInsensitiveParams: Map[String, String]) = http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala deleted file mode 100644 index 5f0802b..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery -import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter -import org.apache.spark.sql.types.StructType - -/** - * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we - * don't need to really send one. - */ -case object KafkaWriterCommitMessage extends WriterCommitMessage - -/** - * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer factory. - * - * @param topic The topic this writer is responsible for. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -class KafkaStreamWriter( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends StreamWriter { - - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) - - override def createWriterFactory(): KafkaStreamWriterFactory = - KafkaStreamWriterFactory(topic, producerParams, schema) - - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} -} - -/** - * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate - * the per-task data writers. - * @param topic The topic that should be written to. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -case class KafkaStreamWriterFactory( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends DataWriterFactory[InternalRow] { - - override def createDataWriter( - partitionId: Int, - taskId: Long, - epochId: Long): DataWriter[InternalRow] = { - new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) - } -} - -/** - * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to - * process incoming rows. - * - * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred - * from a `topic` field in the incoming data. - * @param producerParams Parameters to use for the Kafka producer. - * @param inputSchema The attributes in the input data. - */ -class KafkaStreamDataWriter( - targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) - extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - import scala.collection.JavaConverters._ - - private lazy val producer = CachedKafkaProducer.getOrCreate( - new java.util.HashMap[String, Object](producerParams.asJava)) - - def write(row: InternalRow): Unit = { - checkForErrors() - sendRow(row, producer) - } - - def commit(): WriterCommitMessage = { - // Send is asynchronous, but we can't commit until all rows are actually in Kafka. - // This requires flushing and then checking that no callbacks produced errors. - // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() - KafkaWriterCommitMessage - } - - def abort(): Unit = {} - - def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() - checkForErrors() - CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala new file mode 100644 index 0000000..dc19312 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} +import org.apache.spark.sql.types.StructType + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we + * don't need to really send one. + */ +case object KafkaWriterCommitMessage extends WriterCommitMessage + +/** + * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating the writer factory. + * + * @param topic The topic this writer is responsible for. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +class KafkaStreamingWriteSupport( + topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends StreamingWriteSupport { + + validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + + override def createStreamingWriterFactory(): KafkaStreamWriterFactory = + KafkaStreamWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} +} + +/** + * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate + * the per-task data writers. + * @param topic The topic that should be written to. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +case class KafkaStreamWriterFactory( + topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends StreamingDataWriterFactory { + + override def createWriter( + partitionId: Int, + taskId: Long, + epochId: Long): DataWriter[InternalRow] = { + new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) + } +} + +/** + * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + * from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +class KafkaStreamDataWriter( + targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + import scala.collection.JavaConverters._ + + private lazy val producer = CachedKafkaProducer.getOrCreate( + new java.util.HashMap[String, Object](producerParams.asJava)) + + def write(row: InternalRow): Unit = { + checkForErrors() + sendRow(row, producer) + } + + def commit(): WriterCommitMessage = { + // Send is asynchronous, but we can't commit until all rows are actually in Kafka. + // This requires flushing and then checking that no callbacks produced errors. + // We also check for errors before to fail as soon as possible - the check is cheap. + checkForErrors() + producer.flush() + checkForErrors() + KafkaWriterCommitMessage + } + + def abort(): Unit = {} + + def close(): Unit = { + checkForErrors() + if (producer != null) { + producer.flush() + checkForErrors() + CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index ea2a2a8..3216650 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -61,10 +61,12 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r - }.exists { r => + case r: StreamingDataSourceV2Relation + if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig] + }.exists { config => // Ensure the new topic is present and the old topic is gone. - r.knownPartitions.exists(_.topic == topic2) + config.knownPartitions.exists(_.topic == topic2) }, s"query never reconfigured to new topic $topic2") } http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index fa1468a..fa6bdc2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart} -import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.streaming.Trigger @@ -46,8 +46,10 @@ trait KafkaContinuousTest extends KafkaSourceTest { testUtils.addPartitions(topic, newCount) eventually(timeout(streamingTimeout)) { assert( - query.lastExecution.logical.collectFirst { - case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r + query.lastExecution.executedPlan.collectFirst { + case scan: DataSourceV2ScanExec + if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] => + scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig] }.exists(_.knownPartitions.size == newCount), s"query never reconfigured to $newCount partitions") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org