Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185668041
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
    @@ -0,0 +1,199 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * The record types in a continuous processing buffer.
    + */
    +sealed trait ContinuousRecord
    +case object EpochMarker extends ContinuousRecord
    +case class ContinuousRow(row: UnsafeRow, offset: PartitionOffset) extends 
ContinuousRecord
    +
    +/**
    + * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
    + *
    + * This will be instantiated once per partition - successive calls to 
compute() in the
    + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
    + * offsets across epochs.
    + *
    + * The RDD is responsible for advancing two fields here, since they need 
to be updated in line
    + * with the data flow:
    + *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
    + *    upwards. The RDD is responsible for advancing this.
    + *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
    + *    this before ending the compute() iterator.
    + */
    +class ContinuousQueuedDataReader(
    +    factory: DataReaderFactory[UnsafeRow],
    +    context: TaskContext,
    +    dataQueueSize: Int,
    +    epochPollIntervalMs: Long) extends Closeable {
    +  private val reader = factory.createDataReader()
    +
    +  // Important sequencing - we must get our starting point before the 
provider threads start running
    +  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  private val queue = new 
ArrayBlockingQueue[ContinuousRecord](dataQueueSize)
    +
    +  private val epochPollFailed = new AtomicBoolean(false)
    +  private val dataReaderFailed = new AtomicBoolean(false)
    +
    +  private val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
    +
    +  private val epochMarkerExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +    s"epoch-poll--$coordinatorId--${context.partitionId()}")
    +  private val epochMarkerGenerator = new EpochMarkerGenerator(queue, 
context, epochPollFailed)
    +  epochMarkerExecutor.scheduleWithFixedDelay(
    +    epochMarkerGenerator, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +  private val dataReaderThread = new DataReaderThread(reader, queue, 
context, dataReaderFailed)
    +  dataReaderThread.setDaemon(true)
    +  dataReaderThread.start()
    +
    +  context.addTaskCompletionListener(_ => {
    +    this.close()
    +  })
    +
    +  def next(): ContinuousRecord = {
    +    val POLL_TIMEOUT_MS = 1000
    +    var currentEntry: ContinuousRecord = null
    +
    +    while (currentEntry == null) {
    +      if (context.isInterrupted() || context.isCompleted()) {
    +        // Force the epoch to end here. The writer will notice the context 
is interrupted
    +        // or completed and not start a new one. This makes it possible to 
achieve clean
    +        // shutdown of the streaming query.
    +        // TODO: The obvious generalization of this logic to multiple 
stages won't work. It's
    +        // invalid to send an epoch marker from the bottom of a task if 
all its child tasks
    +        // haven't sent one.
    +        currentEntry = EpochMarker
    +      } else {
    +        if (dataReaderFailed.get()) {
    --- End diff --
    
    Mentioned this above, we can simply check `dataReaderThread.failureReason` 
here instead of having another flag.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to