Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21200#discussion_r185198458
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.continuous
    +
    +import java.io.Closeable
    +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.spark.{Partition, TaskContext}
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
    +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
    + *
    + * This will be instantiated once per partition - successive calls to 
compute() in the
    + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
    + * offsets across epochs.
    + *
    + * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
    + *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
    + *    upwards. The RDD is responsible for advancing this.
    + *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
    + *    this before ending the compute() iterator.
    + *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
    + *    ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
    + *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
    + *    still running. These threads won't be restarted if they fail, so the 
RDD should intercept
    + *    this state when convenient to fail the query.
    + *  * close() - to close this reader when the query is going to shut down.
    + */
    +class ContinuousQueuedDataReader(
    +    split: Partition,
    +    context: TaskContext,
    +    dataQueueSize: Int,
    +    epochPollIntervalMs: Long) extends Closeable {
    +  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
    +    .readerFactory.createDataReader()
    +
    +  // Important sequencing - we must get our starting point before the 
provider threads start running
    +  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
    +  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +
    +  // This queue contains two types of messages:
    +  // * (null, null) representing an epoch boundary.
    +  // * (row, off) containing a data row and its corresponding 
PartitionOffset.
    +  val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
    +
    +  val epochPollFailed = new AtomicBoolean(false)
    +  val dataReaderFailed = new AtomicBoolean(false)
    +
    +  private val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
    +
    +  private val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    +    s"epoch-poll--$coordinatorId--${context.partitionId()}")
    +  val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
    +  epochPollExecutor.scheduleWithFixedDelay(
    +    epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
    +
    +  val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
    +  dataReaderThread.setDaemon(true)
    +  dataReaderThread.start()
    +
    +  context.addTaskCompletionListener(_ => {
    --- End diff --
    
    Maybe better to just call `close` if `this` is visible.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to