Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21200#discussion_r185662378
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition,
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader,
PartitionOffset}
+import org.apache.spark.util.{NextIterator, ThreadUtils}
+
+class ContinuousDataSourceRDDPartition(
+ val index: Int,
+ val readerFactory: DataReaderFactory[UnsafeRow])
+ extends Partition with Serializable {
+
+ // This is semantically a lazy val - it's initialized once the first
time a call to
+ // ContinuousDataSourceRDD.compute() needs to access it, so it can be
shared across
+ // all compute() calls for a partition. This ensures that one compute()
picks up where the
+ // previous one ended.
+ // We don't make it actually a lazy val because it needs input which
isn't available here.
+ // This will only be initialized on the executors.
+ private[continuous] var queueReader: ContinuousQueuedDataReader = _
+}
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+ sc: SparkContext,
+ dataQueueSize: Int,
+ epochPollIntervalMs: Long,
+ @transient private val readerFactories:
Seq[DataReaderFactory[UnsafeRow]])
+ extends RDD[UnsafeRow](sc, Nil) {
+
+ override protected def getPartitions: Array[Partition] = {
+ readerFactories.zipWithIndex.map {
+ case (readerFactory, index) => new
ContinuousDataSourceRDDPartition(index, readerFactory)
+ }.toArray
+ }
+
+ // Initializes the per-task reader if not already done, and then
produces the UnsafeRow
+ // iterator for the current epoch.
+ // Note that the iterator is also responsible for advancing some fields
in the per-task
+ // reader that need to be shared across epochs.
+ override def compute(split: Partition, context: TaskContext):
Iterator[UnsafeRow] = {
+ // If attempt number isn't 0, this is a task retry, which we don't
support.
+ if (context.attemptNumber() != 0) {
+ throw new ContinuousTaskRetryException()
+ }
+
+ val readerForPartition = {
+ val partition = split.asInstanceOf[ContinuousDataSourceRDDPartition]
+ if (partition.queueReader == null) {
+ partition.queueReader =
+ new ContinuousQueuedDataReader(
+ partition.readerFactory, context, dataQueueSize,
epochPollIntervalMs)
+ }
+
+ partition.queueReader
+ }
+
+ val coordinatorId =
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+ val epochEndpoint = EpochCoordinatorRef.get(coordinatorId,
SparkEnv.get)
+ new NextIterator[UnsafeRow] {
+ override def getNext(): UnsafeRow = {
+ readerForPartition.next() match {
+ // epoch boundary marker
+ case EpochMarker =>
+ epochEndpoint.send(ReportPartitionOffset(
+ context.partitionId(),
+ readerForPartition.currentEpoch,
+ readerForPartition.currentOffset))
+ readerForPartition.currentEpoch += 1
+ finished = true
+ null
+ // real row
+ case ContinuousRow(row, offset) =>
+ readerForPartition.currentOffset = offset
--- End diff --
Why does this need to be set by this class? The reader can set it as it
returns ContinuousRows. So this can be a publicly read-only field for the RDD
to report the current offset.
Actually, if you add lastOffset to the EpochMarker (and remove offset from
ContinuousRow since its not needed as the currentOffset is updated internally
by the reader), then currentOffset public method is not needed at all. More
minimal interface.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]