This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1bb0c8e [SPARK-25348][SQL] Data source for binary files 1bb0c8e is described below commit 1bb0c8e407e0fcd1283f0eb2f742ba2567eda87e Author: WeichenXu <weichen...@databricks.com> AuthorDate: Tue Apr 16 15:41:32 2019 -0700 [SPARK-25348][SQL] Data source for binary files ## What changes were proposed in this pull request? Implement binary file data source in Spark. Format name: "binaryFile" (case-insensitive) Schema: - content: BinaryType - status: StructType - path: StringType - modificationTime: TimestampType - length: LongType Options: * pathGlobFilter (instead of pathFilterRegex) to reply on GlobFilter behavior * maxBytesPerPartition is not implemented since it is controlled by two SQL confs: maxPartitionBytes and openCostInBytes. ## How was this patch tested? Unit test added. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24354 from WeichenXu123/binary_file_datasource. Lead-authored-by: WeichenXu <weichen...@databricks.com> Co-authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- ...org.apache.spark.sql.sources.DataSourceRegister | 1 + .../datasources/binaryfile/BinaryFileFormat.scala | 177 +++++++++++++++++++++ .../binaryfile/BinaryFileFormatSuite.scala | 143 +++++++++++++++++ 3 files changed, 321 insertions(+) diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index be9cb81..d988287 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -8,3 +8,4 @@ org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider org.apache.spark.sql.execution.streaming.sources.RateStreamProvider org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider +org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala new file mode 100644 index 0000000..ad9292a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.binaryfile + +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path} +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration + + +/** + * The binary file data source. + * + * It reads binary files and converts each file into a single record that contains the raw content + * and metadata of the file. + * + * Example: + * {{{ + * // Scala + * val df = spark.read.format("binaryFile") + * .option("pathGlobFilter", "*.png") + * .load("/path/to/fileDir") + * + * // Java + * Dataset<Row> df = spark.read().format("binaryFile") + * .option("pathGlobFilter", "*.png") + * .load("/path/to/fileDir"); + * }}} + */ +class BinaryFileFormat extends FileFormat with DataSourceRegister { + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(BinaryFileFormat.schema) + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + throw new UnsupportedOperationException("Write is not supported for binary file data source") + } + + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + false + } + + override def shortName(): String = "binaryFile" + + override protected def buildReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + val binaryFileSourceOptions = new BinaryFileSourceOptions(options) + + val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter + + (file: PartitionedFile) => { + val path = file.filePath + val fsPath = new Path(path) + + // TODO: Improve performance here: each file will recompile the glob pattern here. + val globFilter = pathGlobPattern.map(new GlobFilter(_)) + if (!globFilter.isDefined || globFilter.get.accept(fsPath)) { + val fs = fsPath.getFileSystem(broadcastedHadoopConf.value.value) + val fileStatus = fs.getFileStatus(fsPath) + val length = fileStatus.getLen() + val modificationTime = fileStatus.getModificationTime() + val stream = fs.open(fsPath) + + val content = try { + ByteStreams.toByteArray(stream) + } finally { + Closeables.close(stream, true) + } + + val fullOutput = dataSchema.map { f => + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)() + } + val requiredOutput = fullOutput.filter { a => + requiredSchema.fieldNames.contains(a.name) + } + + // TODO: Add column pruning + // currently it still read the file content even if content column is not required. + val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput) + + val internalRow = InternalRow( + content, + InternalRow( + UTF8String.fromString(path), + DateTimeUtils.fromMillis(modificationTime), + length + ) + ) + + Iterator(requiredColumns(internalRow)) + } else { + Iterator.empty + } + } + } +} + +object BinaryFileFormat { + + private val fileStatusSchema = StructType( + StructField("path", StringType, false) :: + StructField("modificationTime", TimestampType, false) :: + StructField("length", LongType, false) :: Nil) + + /** + * Schema for the binary file data source. + * + * Schema: + * - content (BinaryType): The content of the file. + * - status (StructType): The status of the file. + * - path (StringType): The path of the file. + * - modificationTime (TimestampType): The modification time of the file. + * In some Hadoop FileSystem implementation, this might be unavailable and fallback to some + * default value. + * - length (LongType): The length of the file in bytes. + */ + val schema = StructType( + StructField("content", BinaryType, true) :: + StructField("status", fileStatusSchema, false) :: Nil) +} + +class BinaryFileSourceOptions( + @transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + /** + * An optional glob pattern to only include files with paths matching the pattern. + * The syntax follows [[org.apache.hadoop.fs.GlobFilter]]. + */ + val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala new file mode 100644 index 0000000..090f417 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.binaryfile + +import java.io.File +import java.nio.file.{Files, StandardOpenOption} +import java.sql.Timestamp + +import scala.collection.JavaConverters._ + +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.fs.{FileSystem, GlobFilter, Path} + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.util.Utils + +class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTestUtils { + + private var testDir: String = _ + + private var fsTestDir: Path = _ + + private var fs: FileSystem = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + testDir = Utils.createTempDir().getAbsolutePath + fsTestDir = new Path(testDir) + fs = fsTestDir.getFileSystem(sparkContext.hadoopConfiguration) + + val year2014Dir = new File(testDir, "year=2014") + year2014Dir.mkdir() + val year2015Dir = new File(testDir, "year=2015") + year2015Dir.mkdir() + + Files.write( + new File(year2014Dir, "data.txt").toPath, + Seq("2014-test").asJava, + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ) + Files.write( + new File(year2014Dir, "data2.bin").toPath, + "2014-test-bin".getBytes, + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ) + + Files.write( + new File(year2015Dir, "bool.csv").toPath, + Seq("bool", "True", "False", "true").asJava, + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ) + Files.write( + new File(year2015Dir, "data.txt").toPath, + "2015-test".getBytes, + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ) + } + + def testBinaryFileDataSource(pathGlobFilter: String): Unit = { + val resultDF = spark.read.format("binaryFile") + .option("pathGlobFilter", pathGlobFilter) + .load(testDir) + .select( + col("status.path"), + col("status.modificationTime"), + col("status.length"), + col("content"), + col("year") // this is a partition column + ) + + val expectedRowSet = new collection.mutable.HashSet[Row]() + + val globFilter = new GlobFilter(pathGlobFilter) + for (partitionDirStatus <- fs.listStatus(fsTestDir)) { + val dirPath = partitionDirStatus.getPath + + val partitionName = dirPath.getName.split("=")(1) + val year = partitionName.toInt // partition column "year" value which is `Int` type + + for (fileStatus <- fs.listStatus(dirPath)) { + if (globFilter.accept(fileStatus.getPath)) { + val fpath = fileStatus.getPath.toString.replace("file:/", "file:///") + val flen = fileStatus.getLen + val modificationTime = new Timestamp(fileStatus.getModificationTime) + + val fcontent = { + val stream = fs.open(fileStatus.getPath) + val content = try { + ByteStreams.toByteArray(stream) + } finally { + Closeables.close(stream, true) + } + content + } + + val row = Row(fpath, modificationTime, flen, fcontent, year) + expectedRowSet.add(row) + } + } + } + + checkAnswer(resultDF, expectedRowSet.toSeq) + } + + test("binary file data source test") { + testBinaryFileDataSource(pathGlobFilter = "*.*") + testBinaryFileDataSource(pathGlobFilter = "*.bin") + testBinaryFileDataSource(pathGlobFilter = "*.txt") + testBinaryFileDataSource(pathGlobFilter = "*.{txt,csv}") + testBinaryFileDataSource(pathGlobFilter = "*.json") + } + + test ("binary file data source do not support write operation") { + val df = spark.read.format("binaryFile").load(testDir) + withTempDir { tmpDir => + val thrown = intercept[UnsupportedOperationException] { + df.write + .format("binaryFile") + .save(tmpDir + "/test_save") + } + assert(thrown.getMessage.contains("Write is not supported for binary file data source")) + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org