Repository: spark Updated Branches: refs/heads/branch-1.4 a9d84a9bf -> d2328137f
http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ca53dcd..5e010d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -17,11 +17,19 @@ package org.apache.spark.sql.sources -import org.apache.spark.annotation.{Experimental, DeveloperApi} +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, DataFrame, Row, SQLContext} -import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection +import org.apache.spark.sql.types.{StructField, StructType} /** * ::DeveloperApi:: @@ -78,6 +86,41 @@ trait SchemaRelationProvider { schema: StructType): BaseRelation } +/** + * ::DeveloperApi:: + * Implemented by objects that produce relations for a specific kind of data source + * with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a + * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined + * schema, and an optional list of partition columns, this interface is used to pass in the + * parameters specified by a user. + * + * Users may specify the fully qualified class name of a given data source. When that class is + * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for + * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the + * data source 'org.apache.spark.sql.json.DefaultSource' + * + * A new instance of this class with be instantiated each time a DDL call is made. + * + * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is + * that users need to provide a schema and a (possibly empty) list of partition columns when + * using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]], + * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified + * schemas, and accessing partitioned relations. + */ +trait FSBasedRelationProvider { + /** + * Returns a new base relation with the given parameters, a user defined schema, and a list of + * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity + * is enforced by the Map that is passed to the function. + */ + def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): FSBasedRelation +} + @DeveloperApi trait CreatableRelationProvider { /** @@ -207,3 +250,235 @@ trait InsertableRelation { trait CatalystScan { def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] } + +/** + * ::Experimental:: + * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the + * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. + * An [[OutputWriter]] instance is created and initialized when a new output file is opened on + * executor side. This instance is used to persist rows to this single output file. + */ +@Experimental +abstract class OutputWriter { + /** + * Initializes this [[OutputWriter]] before any rows are persisted. + * + * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that + * this may not point to the final output file. For example, `FileOutputFormat` writes to + * temporary directories and then merge written files back to the final destination. In + * this case, `path` points to a temporary output file under the temporary directory. + * @param dataSchema Schema of the rows to be written. Partition columns are not included in the + * schema if the corresponding relation is partitioned. + * @param context The Hadoop MapReduce task context. + */ + def init( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): Unit = () + + /** + * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned + * tables, dynamic partition columns are not included in rows to be written. + */ + def write(row: Row): Unit + + /** + * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before + * the task output is committed. + */ + def close(): Unit +} + +/** + * ::Experimental:: + * A [[BaseRelation]] that provides much of the common code required for formats that store their + * data to an HDFS compatible filesystem. + * + * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and + * filter using selected predicates before producing an RDD containing all matching tuples as + * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file + * systems, it's able to discover partitioning information from the paths of input directories, and + * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must + * override one of the three `buildScan` methods to implement the read path. + * + * For the write path, it provides the ability to write to both non-partitioned and partitioned + * tables. Directory layout of the partitioned tables is compatible with Hive. + * + * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for + * implementing metastore table conversion. + * @param paths Base paths of this relation. For partitioned relations, it should be the root + * directories of all partition directories. + * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional + * [[PartitionSpec]], so that partition discovery can be skipped. + */ +@Experimental +abstract class FSBasedRelation private[sql]( + val paths: Array[String], + maybePartitionSpec: Option[PartitionSpec]) + extends BaseRelation { + + /** + * Constructs an [[FSBasedRelation]]. + * + * @param paths Base paths of this relation. For partitioned relations, it should be either root + * directories of all partition directories. + * @param partitionColumns Partition columns of this relation. + */ + def this(paths: Array[String], partitionColumns: StructType) = + this(paths, { + if (partitionColumns.isEmpty) None + else Some(PartitionSpec(partitionColumns, Array.empty[Partition])) + }) + + /** + * Constructs an [[FSBasedRelation]]. + * + * @param paths Base paths of this relation. For partitioned relations, it should be root + * directories of all partition directories. + */ + def this(paths: Array[String]) = this(paths, None) + + private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + private val codegenEnabled = sqlContext.conf.codegenEnabled + + private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec => + spec.copy(partitionColumns = spec.partitionColumns.asNullable) + }.getOrElse { + if (sqlContext.conf.partitionDiscoveryEnabled()) { + discoverPartitions() + } else { + PartitionSpec(StructType(Nil), Array.empty[Partition]) + } + } + + private[sql] def partitionSpec: PartitionSpec = _partitionSpec + + /** + * Partition columns. Note that they are always nullable. + */ + def partitionColumns: StructType = partitionSpec.partitionColumns + + private[sql] def refresh(): Unit = { + if (sqlContext.conf.partitionDiscoveryEnabled()) { + _partitionSpec = discoverPartitions() + } + } + + private def discoverPartitions(): PartitionSpec = { + val basePaths = paths.map(new Path(_)) + val leafDirs = basePaths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory))) + .filter(_.isDir) + .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _)) + .getOrElse(Seq.empty[FileStatus]) + }.map(_.getPath) + + if (leafDirs.nonEmpty) { + PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__") + } else { + PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition]) + } + } + + /** + * Schema of this relation. It consists of columns appearing in [[dataSchema]] and all partition + * columns not appearing in [[dataSchema]]. + */ + override lazy val schema: StructType = { + val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet + StructType(dataSchema ++ partitionSpec.partitionColumns.filterNot { column => + dataSchemaColumnNames.contains(column.name.toLowerCase) + }) + } + + /** + * Specifies schema of actual data files. For partitioned relations, if one or more partitioned + * columns are contained in the data files, they should also appear in `dataSchema`. + */ + def dataSchema: StructType + + /** + * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within + * this relation. For partitioned relations, this method is called for each selected partition, + * and builds an `RDD[Row]` containing all rows within that single partition. + * + * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * relation. For a partitioned relation, it contains paths of all data files in a single + * selected partition. + */ + def buildScan(inputPaths: Array[String]): RDD[Row] = { + throw new RuntimeException( + "At least one buildScan() method should be overridden to read the relation.") + } + + /** + * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within + * this relation. For partitioned relations, this method is called for each selected partition, + * and builds an `RDD[Row]` containing all rows within that single partition. + * + * @param requiredColumns Required columns. + * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * relation. For a partitioned relation, it contains paths of all data files in a single + * selected partition. + */ + def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = { + // Yeah, to workaround serialization... + val dataSchema = this.dataSchema + val codegenEnabled = this.codegenEnabled + + val requiredOutput = requiredColumns.map { col => + val field = dataSchema(col) + BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable) + }.toSeq + + buildScan(inputPaths).mapPartitions { rows => + val buildProjection = if (codegenEnabled) { + GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) + } else { + () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes) + } + + val mutableProjection = buildProjection() + rows.map(mutableProjection) + } + } + + /** + * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within + * this relation. For partitioned relations, this method is called for each selected partition, + * and builds an `RDD[Row]` containing all rows within that single partition. + * + * @param requiredColumns Required columns. + * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction + * of all `filters`. The pushed down filters are currently purely an optimization as they + * will all be evaluated again. This means it is safe to use them with methods that produce + * false positives such as filtering partitions based on a bloom filter. + * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the + * relation. For a partitioned relation, it contains paths of all data files in a single + * selected partition. + */ + def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String]): RDD[Row] = { + buildScan(requiredColumns, inputPaths) + } + + /** + * Client side preparation for data writing can be put here. For example, user defined output + * committer can be configured here. + * + * Note that the only side effect expected here is mutating `job` via its setters. Especially, + * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states + * may cause unexpected behaviors. + */ + def prepareForWrite(job: Job): Unit = () + + /** + * This method is responsible for producing a new [[OutputWriter]] for each newly opened output + * file on the executor side. + */ + def outputWriterClass: Class[_ <: OutputWriter] +} http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala index 6ed68d1..aad1d24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala @@ -101,13 +101,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } } - case i @ logical.InsertIntoTable( - l: LogicalRelation, partition, query, overwrite, ifNotExists) - if !l.isInstanceOf[InsertableRelation] => + case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK + case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") - case CreateTableUsingAsSelect(tableName, _, _, SaveMode.Overwrite, _, query) => + case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. if (catalog.tableExists(Seq(tableName))) { http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index b7561ce..bea568e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.parquet.ParquetRelation2._ +import org.apache.spark.sql.sources.PartitioningUtils._ +import org.apache.spark.sql.sources.{Partition, PartitionSpec} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, Row, SQLContext} http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 54f2f3c..4e54b2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.sources -import java.io.{IOException, File} +import java.io.{File, IOException} -import org.apache.spark.sql.AnalysisException import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.AnalysisException import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bbf48ef..d754c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,25 +19,24 @@ package org.apache.spark.sql.hive import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} - import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.Warehouse +import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.Logging -import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} -import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Catalog, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} -import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -98,6 +97,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive ResolvedDataSource( hive, userSpecifiedSchema, + Array.empty[String], table.properties("spark.sql.sources.provider"), options) @@ -438,6 +438,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive desc.name, hive.conf.defaultDataSourceName, temporary = false, + Array.empty[String], mode, options = Map.empty[String, String], child http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index be9249a..d46a127 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -221,14 +221,14 @@ private[hive] trait HiveStrategies { object HiveDDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case CreateTableUsing( - tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => + tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) => ExecutedCommand( CreateMetastoreDataSource( tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil - case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) => + case CreateTableUsingAsSelect(tableName, provider, false, partitionCols, mode, opts, query) => val cmd = - CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query) + CreateMetastoreDataSourceAsSelect(tableName, provider, partitionCols, mode, opts, query) ExecutedCommand(cmd) :: Nil case _ => Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index abab1a2..8e405e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -158,6 +158,7 @@ private[hive] case class CreateMetastoreDataSourceAsSelect( tableName: String, provider: String, + partitionColumns: Array[String], mode: SaveMode, options: Map[String, String], query: LogicalPlan) extends RunnableCommand { @@ -189,12 +190,12 @@ case class CreateMetastoreDataSourceAsSelect( return Seq.empty[Row] case SaveMode.Append => // Check if the specified data source match the data source of the existing table. - val resolved = - ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath) + val resolved = ResolvedDataSource( + sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { - case l @ LogicalRelation(i: InsertableRelation) => - if (i != createdRelation.relation) { + case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) => + if (l.relation != createdRelation.relation) { val errorDescription = s"Cannot append to table $tableName because the resolved relation does not " + s"match the existing relation of $tableName. " + @@ -202,14 +203,13 @@ case class CreateMetastoreDataSourceAsSelect( s"table $tableName and using its data source and options." val errorMessage = s""" - |$errorDescription - |== Relations == - |${sideBySide( - s"== Expected Relation ==" :: - l.toString :: Nil, - s"== Actual Relation ==" :: - createdRelation.toString :: Nil).mkString("\n")} - """.stripMargin + |$errorDescription + |== Relations == + |${sideBySide( + s"== Expected Relation ==" :: l.toString :: Nil, + s"== Actual Relation ==" :: createdRelation.toString :: Nil + ).mkString("\n")} + """.stripMargin throw new AnalysisException(errorMessage) } existingSchema = Some(l.schema) @@ -234,7 +234,8 @@ case class CreateMetastoreDataSourceAsSelect( } // Create the relation based on the data of df. - val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df) + val resolved = + ResolvedDataSource(sqlContext, provider, partitionColumns, mode, optionsWithPath, df) if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 8398da2..cbc381c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -204,7 +204,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( if (string == null || string.isEmpty) { defaultPartName } else { - FileUtils.escapePathName(string) + FileUtils.escapePathName(string, defaultPartName) } s"/$col=$colString" }.mkString http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala new file mode 100644 index 0000000..415b1cd --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetTest +import org.apache.spark.sql.types._ + +// TODO Don't extend ParquetTest +// This test suite extends ParquetTest for some convenient utility methods. These methods should be +// moved to some more general places, maybe QueryTest. +class FSBasedRelationSuite extends QueryTest with ParquetTest { + override val sqlContext: SQLContext = TestHive + + import sqlContext._ + import sqlContext.implicits._ + + val dataSchema = + StructType( + Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = false))) + + val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") + + val partitionedTestDF1 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF2 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) + + def checkQueries(df: DataFrame): Unit = { + // Selects everything + checkAnswer( + df, + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + + // Simple filtering and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 === 2), + for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) + + // Simple projection and filtering + checkAnswer( + df.filter('a > 1).select('b, 'a + 1), + for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) + + // Simple projection and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + + // Self-join + df.registerTempTable("t") + withTempTable("t") { + checkAnswer( + sql( + """SELECT l.a, r.b, l.p1, r.p2 + |FROM t l JOIN t r + |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 + """.stripMargin), + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + } + } + + test("save()/load() - non-partitioned table - Overwrite") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + testDF.collect()) + } + } + + test("save()/load() - non-partitioned table - Append") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)).orderBy("a"), + testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("save()/load() - non-partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("save()/load() - non-partitioned table - Ignore") { + withTempDir { file => + testDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + assert(fs.listStatus(path).isEmpty) + } + } + + test("save()/load() - partitioned table - simple queries") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkQueries( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json))) + } + } + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } + + test("save()/load() - partitioned table - Overwrite") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - Append") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("save()/load() - partitioned table - Append - new partition values") { + withTempPath { file => + partitionedTestDF1.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("save()/load() - partitioned table - Ignore") { + withTempDir { file => + partitionedTestDF.save( + path = file.getCanonicalPath, + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + assert(fs.listStatus(path).isEmpty) + } + } + + def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sql(s"DROP TABLE $tableName") + } + + test("saveAsTable()/load() - non-partitioned table - Overwrite") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkAnswer(table("t"), testDF.collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - Append") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite) + + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append) + + withTable("t") { + checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("saveAsTable()/load() - non-partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + testDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore) + + assert(table("t").collect().isEmpty) + } + } + + test("saveAsTable()/load() - partitioned table - simple queries") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkQueries(table("t")) + } + } + + test("saveAsTable()/load() - partitioned table - Overwrite") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - new partition values") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + // Using only a subset of all partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1")) + } + + // Using different order of partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p2", "p1")) + } + } + + test("saveAsTable()/load() - partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.ErrorIfExists, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("saveAsTable()/load() - partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Ignore, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + assert(table("t").collect().isEmpty) + } + } + + test("Hadoop style globbing") { + withTempPath { file => + partitionedTestDF.save( + source = classOf[SimpleTextSource].getCanonicalName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + val df = load( + source = classOf[SimpleTextSource].getCanonicalName, + options = Map( + "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", + "dataSchema" -> dataSchema.json)) + + val expectedPaths = Set( + s"${file.getCanonicalFile}/p1=1/p2=foo", + s"${file.getCanonicalFile}/p1=2/p2=foo", + s"${file.getCanonicalFile}/p1=1/p2=bar", + s"${file.getCanonicalFile}/p1=2/p2=bar" + ).map { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + } + + println(df.queryExecution) + + val actualPaths = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: FSBasedRelation) => + relation.paths.toSet + }.getOrElse { + fail("Expect an FSBasedRelation, but none could be found") + } + + assert(actualPaths === expectedPaths) + checkAnswer(df, partitionedTestDF.collect()) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d2328137/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala new file mode 100644 index 0000000..8801aba --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.text.NumberFormat +import java.util.UUID + +import com.google.common.base.Objects +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} +import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} + +/** + * A simple example [[FSBasedRelationProvider]]. + */ +class SimpleTextSource extends FSBasedRelationProvider { + override def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): FSBasedRelation = { + val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField])) + new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext) + } +} + +class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] { + val numberFormat = NumberFormat.getInstance() + + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val split = context.getTaskAttemptID.getTaskID.getId + val name = FileOutputFormat.getOutputName(context) + new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}") + } +} + +class SimpleTextOutputWriter extends OutputWriter { + private var recordWriter: RecordWriter[NullWritable, Text] = _ + private var taskAttemptContext: TaskAttemptContext = _ + + override def init( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): Unit = { + recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) + taskAttemptContext = context + } + + override def write(row: Row): Unit = { + val serialized = row.toSeq.map(_.toString).mkString(",") + recordWriter.write(null, new Text(serialized)) + } + + override def close(): Unit = recordWriter.close(taskAttemptContext) +} + +/** + * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma + * separated string lines. When scanning data, schema must be explicitly provided via data source + * option `"dataSchema"`. + */ +class SimpleTextRelation( + paths: Array[String], + val maybeDataSchema: Option[StructType], + partitionsSchema: StructType, + parameters: Map[String, String])( + @transient val sqlContext: SQLContext) + extends FSBasedRelation(paths, partitionsSchema) { + + import sqlContext.sparkContext + + override val dataSchema: StructType = + maybeDataSchema.getOrElse(DataType.fromJson(parameters("dataSchema")).asInstanceOf[StructType]) + + override def equals(other: Any): Boolean = other match { + case that: SimpleTextRelation => + this.paths.sameElements(that.paths) && + this.maybeDataSchema == that.maybeDataSchema && + this.dataSchema == that.dataSchema && + this.partitionColumns == that.partitionColumns + + case _ => false + } + + override def hashCode(): Int = + Objects.hashCode(paths, maybeDataSchema, dataSchema) + + override def outputWriterClass: Class[_ <: OutputWriter] = + classOf[SimpleTextOutputWriter] + + override def buildScan(inputPaths: Array[String]): RDD[Row] = { + val fields = dataSchema.map(_.dataType) + + sparkContext.textFile(inputPaths.mkString(",")).map { record => + Row(record.split(",").zip(fields).map { case (value, dataType) => + Cast(Literal(value), dataType).eval() + }: _*) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org