Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60793559
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
    @@ -19,72 +19,129 @@ package org.apache.spark.sql.streaming
     
     import java.io.File
     
    -import org.apache.spark.sql.{AnalysisException, StreamTest}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType}
     import org.apache.spark.sql.catalyst.util._
     import org.apache.spark.sql.execution.streaming._
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.sql.types.{StringType, StructType}
    +import org.apache.spark.sql.{AnalysisException, DataFrame, StreamTest}
     import org.apache.spark.util.Utils
     
     class FileStreamSourceTest extends StreamTest with SharedSQLContext {
     
       import testImplicits._
     
    -  case class AddTextFileData(source: FileStreamSource, content: String, 
src: File, tmp: File)
    -    extends AddData {
    +  /**
    +   * A subclass [[AddData]] for adding data to files. This is meant to use 
the
    +   * [[FileStreamSource]] actually being used in the execution.
    +   */
    +  abstract class AddFileData extends AddData {
    +    override def addData(query: Option[StreamExecution]): (Source, Offset) 
= {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active 
file stream source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
    +          source.asInstanceOf[FileStreamSource]
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find file source in the StreamExecution logical plan 
to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the file source in the StreamExecution logical 
plan as there" +
    +            "are multiple file sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val source = sources.head
    +      val newOffset = source.withBatchingLocked {
    +        addData(source)
    +        source.currentOffset + 1
    +      }
    +      logInfo(s"Added data to $source at offset $newOffset")
    +      (source, newOffset)
    +    }
    +
    +    protected def addData(source: FileStreamSource): Unit
    +  }
     
    -    override def addData(): Offset = {
    -      source.withBatchingLocked {
    -        val file = Utils.tempFileWith(new File(tmp, "text"))
    -        stringToFile(file, content).renameTo(new File(src, file.getName))
    -        source.currentOffset
    -      } + 1
    +  case class AddTextFileData(content: String, src: File, tmp: File)
    +    extends AddFileData {
    +
    +    override def addData(source: FileStreamSource): Unit = {
    +      val file = Utils.tempFileWith(new File(tmp, "text"))
    +      stringToFile(file, content).renameTo(new File(src, file.getName))
         }
       }
     
    -  case class AddParquetFileData(
    -      source: FileStreamSource,
    -      content: Seq[String],
    -      src: File,
    -      tmp: File) extends AddData {
    -
    -    override def addData(): Offset = {
    -      source.withBatchingLocked {
    -        val file = Utils.tempFileWith(new File(tmp, "parquet"))
    -        content.toDS().toDF().write.parquet(file.getCanonicalPath)
    -        file.renameTo(new File(src, file.getName))
    -        source.currentOffset
    -      } + 1
    +  case class AddParquetFileData(data: DataFrame, src: File, tmp: File) 
extends AddFileData {
    +    override def addData(source: FileStreamSource): Unit = {
    +      AddParquetFileData.writeToFile(data, src, tmp)
         }
       }
     
    -  /** Use `format` and `path` to create FileStreamSource via 
DataFrameReader */
    -  def createFileStreamSource(
    +  object AddParquetFileData {
    +    def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData 
= {
    +      AddParquetFileData(seq.toDS().toDF(), src, tmp)
    +    }
    +
    +    def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
    +      val file = Utils.tempFileWith(new File(tmp, "parquet"))
    +      df.write.parquet(file.getCanonicalPath)
    +      file.renameTo(new File(src, file.getName))
    +    }
    +  }
    +
    +  def createFileStream(
           format: String,
           path: String,
    -      schema: Option[StructType] = None): FileStreamSource = {
    -    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
    +      schema: Option[StructType] = None): DataFrame = {
    +
         val reader =
           if (schema.isDefined) {
             sqlContext.read.format(format).schema(schema.get)
           } else {
             sqlContext.read.format(format)
           }
         reader.stream(path)
    -      .queryExecution.analyzed
    +  }
    +
    +  protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = 
{
    +    val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
    +    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
    --- End diff --
    
    nit: unused line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to