This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb0c8e  [SPARK-25348][SQL] Data source for binary files
1bb0c8e is described below

commit 1bb0c8e407e0fcd1283f0eb2f742ba2567eda87e
Author: WeichenXu <weichen...@databricks.com>
AuthorDate: Tue Apr 16 15:41:32 2019 -0700

    [SPARK-25348][SQL] Data source for binary files
    
    ## What changes were proposed in this pull request?
    
    Implement binary file data source in Spark.
    
    Format name: "binaryFile" (case-insensitive)
    
    Schema:
    - content: BinaryType
    - status: StructType
      - path: StringType
      - modificationTime: TimestampType
      - length: LongType
    
    Options:
    * pathGlobFilter (instead of pathFilterRegex) to reply on GlobFilter 
behavior
    * maxBytesPerPartition is not implemented since it is controlled by two SQL 
confs: maxPartitionBytes and openCostInBytes.
    
    ## How was this patch tested?
    
    Unit test added.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24354 from WeichenXu123/binary_file_datasource.
    
    Lead-authored-by: WeichenXu <weichen...@databricks.com>
    Co-authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 ...org.apache.spark.sql.sources.DataSourceRegister |   1 +
 .../datasources/binaryfile/BinaryFileFormat.scala  | 177 +++++++++++++++++++++
 .../binaryfile/BinaryFileFormatSuite.scala         | 143 +++++++++++++++++
 3 files changed, 321 insertions(+)

diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index be9cb81..d988287 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -8,3 +8,4 @@ 
org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
+org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
new file mode 100644
index 0000000..ad9292a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.binaryfile
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path}
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
+
+
+/**
+ * The binary file data source.
+ *
+ * It reads binary files and converts each file into a single record that 
contains the raw content
+ * and metadata of the file.
+ *
+ * Example:
+ * {{{
+ *   // Scala
+ *   val df = spark.read.format("binaryFile")
+ *     .option("pathGlobFilter", "*.png")
+ *     .load("/path/to/fileDir")
+ *
+ *   // Java
+ *   Dataset<Row> df = spark.read().format("binaryFile")
+ *     .option("pathGlobFilter", "*.png")
+ *     .load("/path/to/fileDir");
+ * }}}
+ */
+class BinaryFileFormat extends FileFormat with DataSourceRegister {
+
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = 
Some(BinaryFileFormat.schema)
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    throw new UnsupportedOperationException("Write is not supported for binary 
file data source")
+  }
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    false
+  }
+
+  override def shortName(): String = "binaryFile"
+
+  override protected def buildReader(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
+
+    val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
+
+    (file: PartitionedFile) => {
+      val path = file.filePath
+      val fsPath = new Path(path)
+
+      // TODO: Improve performance here: each file will recompile the glob 
pattern here.
+      val globFilter = pathGlobPattern.map(new GlobFilter(_))
+      if (!globFilter.isDefined || globFilter.get.accept(fsPath)) {
+        val fs = fsPath.getFileSystem(broadcastedHadoopConf.value.value)
+        val fileStatus = fs.getFileStatus(fsPath)
+        val length = fileStatus.getLen()
+        val modificationTime = fileStatus.getModificationTime()
+        val stream = fs.open(fsPath)
+
+        val content = try {
+          ByteStreams.toByteArray(stream)
+        } finally {
+          Closeables.close(stream, true)
+        }
+
+        val fullOutput = dataSchema.map { f =>
+          AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
+        }
+        val requiredOutput = fullOutput.filter { a =>
+          requiredSchema.fieldNames.contains(a.name)
+        }
+
+        // TODO: Add column pruning
+        // currently it still read the file content even if content column is 
not required.
+        val requiredColumns = 
GenerateUnsafeProjection.generate(requiredOutput, fullOutput)
+
+        val internalRow = InternalRow(
+          content,
+          InternalRow(
+            UTF8String.fromString(path),
+            DateTimeUtils.fromMillis(modificationTime),
+            length
+          )
+        )
+
+        Iterator(requiredColumns(internalRow))
+      } else {
+        Iterator.empty
+      }
+    }
+  }
+}
+
+object BinaryFileFormat {
+
+  private val fileStatusSchema = StructType(
+    StructField("path", StringType, false) ::
+      StructField("modificationTime", TimestampType, false) ::
+      StructField("length", LongType, false) :: Nil)
+
+  /**
+   * Schema for the binary file data source.
+   *
+   * Schema:
+   *  - content (BinaryType): The content of the file.
+   *  - status (StructType): The status of the file.
+   *    - path (StringType): The path of the file.
+   *    - modificationTime (TimestampType): The modification time of the file.
+   *      In some Hadoop FileSystem implementation, this might be unavailable 
and fallback to some
+   *      default value.
+   *    - length (LongType): The length of the file in bytes.
+   */
+  val schema = StructType(
+    StructField("content", BinaryType, true) ::
+      StructField("status", fileStatusSchema, false) :: Nil)
+}
+
+class BinaryFileSourceOptions(
+    @transient private val parameters: CaseInsensitiveMap[String]) extends 
Serializable {
+
+  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
+
+  /**
+   * An optional glob pattern to only include files with paths matching the 
pattern.
+   * The syntax follows [[org.apache.hadoop.fs.GlobFilter]].
+   */
+  val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter")
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
new file mode 100644
index 0000000..090f417
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.binaryfile
+
+import java.io.File
+import java.nio.file.{Files, StandardOpenOption}
+import java.sql.Timestamp
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.fs.{FileSystem, GlobFilter, Path}
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.util.Utils
+
+class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
+
+  private var testDir: String = _
+
+  private var fsTestDir: Path = _
+
+  private var fs: FileSystem = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    testDir = Utils.createTempDir().getAbsolutePath
+    fsTestDir = new Path(testDir)
+    fs = fsTestDir.getFileSystem(sparkContext.hadoopConfiguration)
+
+    val year2014Dir = new File(testDir, "year=2014")
+    year2014Dir.mkdir()
+    val year2015Dir = new File(testDir, "year=2015")
+    year2015Dir.mkdir()
+
+    Files.write(
+      new File(year2014Dir, "data.txt").toPath,
+      Seq("2014-test").asJava,
+      StandardOpenOption.CREATE, StandardOpenOption.WRITE
+    )
+    Files.write(
+      new File(year2014Dir, "data2.bin").toPath,
+      "2014-test-bin".getBytes,
+      StandardOpenOption.CREATE, StandardOpenOption.WRITE
+    )
+
+    Files.write(
+      new File(year2015Dir, "bool.csv").toPath,
+      Seq("bool", "True", "False", "true").asJava,
+      StandardOpenOption.CREATE, StandardOpenOption.WRITE
+    )
+    Files.write(
+      new File(year2015Dir, "data.txt").toPath,
+      "2015-test".getBytes,
+      StandardOpenOption.CREATE, StandardOpenOption.WRITE
+    )
+  }
+
+  def testBinaryFileDataSource(pathGlobFilter: String): Unit = {
+    val resultDF = spark.read.format("binaryFile")
+      .option("pathGlobFilter", pathGlobFilter)
+      .load(testDir)
+      .select(
+        col("status.path"),
+        col("status.modificationTime"),
+        col("status.length"),
+        col("content"),
+        col("year") // this is a partition column
+      )
+
+    val expectedRowSet = new collection.mutable.HashSet[Row]()
+
+    val globFilter = new GlobFilter(pathGlobFilter)
+    for (partitionDirStatus <- fs.listStatus(fsTestDir)) {
+      val dirPath = partitionDirStatus.getPath
+
+      val partitionName = dirPath.getName.split("=")(1)
+      val year = partitionName.toInt // partition column "year" value which is 
`Int` type
+
+      for (fileStatus <- fs.listStatus(dirPath)) {
+        if (globFilter.accept(fileStatus.getPath)) {
+          val fpath = fileStatus.getPath.toString.replace("file:/", "file:///")
+          val flen = fileStatus.getLen
+          val modificationTime = new Timestamp(fileStatus.getModificationTime)
+
+          val fcontent = {
+            val stream = fs.open(fileStatus.getPath)
+            val content = try {
+              ByteStreams.toByteArray(stream)
+            } finally {
+              Closeables.close(stream, true)
+            }
+            content
+          }
+
+          val row = Row(fpath, modificationTime, flen, fcontent, year)
+          expectedRowSet.add(row)
+        }
+      }
+    }
+
+    checkAnswer(resultDF, expectedRowSet.toSeq)
+  }
+
+  test("binary file data source test") {
+    testBinaryFileDataSource(pathGlobFilter = "*.*")
+    testBinaryFileDataSource(pathGlobFilter = "*.bin")
+    testBinaryFileDataSource(pathGlobFilter = "*.txt")
+    testBinaryFileDataSource(pathGlobFilter = "*.{txt,csv}")
+    testBinaryFileDataSource(pathGlobFilter = "*.json")
+  }
+
+  test ("binary file data source do not support write operation") {
+    val df = spark.read.format("binaryFile").load(testDir)
+    withTempDir { tmpDir =>
+      val thrown = intercept[UnsupportedOperationException] {
+        df.write
+          .format("binaryFile")
+          .save(tmpDir + "/test_save")
+      }
+      assert(thrown.getMessage.contains("Write is not supported for binary 
file data source"))
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to