Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/7696#discussion_r35666265
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala ---
@@ -17,208 +17,153 @@
package org.apache.spark.sql.json
-import java.io.IOException
-
-import org.apache.hadoop.fs.{FileSystem, Path}
+import java.io.CharArrayWriter
+
+import com.fasterxml.jackson.core.JsonFactory
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{Text, LongWritable, NullWritable}
+import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.Logging
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
-
+import org.apache.spark.sql.{Row, SQLContext}
-private[sql] class DefaultSource
- extends RelationProvider
- with SchemaRelationProvider
- with CreatableRelationProvider {
-
- private def checkPath(parameters: Map[String, String]): String = {
- parameters.getOrElse("path", sys.error("'path' must be specified for
json data."))
- }
-
- /** Constraints to be imposed on dataframe to be stored. */
- private def checkConstraints(data: DataFrame): Unit = {
- if (data.schema.fieldNames.length !=
data.schema.fieldNames.distinct.length) {
- val duplicateColumns =
data.schema.fieldNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }.mkString(", ")
- throw new AnalysisException(s"Duplicate column(s) :
$duplicateColumns found, " +
- s"cannot save to JSON format")
- }
- }
-
- /** Returns a new base relation with the parameters. */
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
- parameters: Map[String, String]): BaseRelation = {
- val path = checkPath(parameters)
+ paths: Array[String],
+ dataSchema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): HadoopFsRelation = {
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- new JSONRelation(path, samplingRatio, None, sqlContext)
+ new JSONRelation(None, samplingRatio, dataSchema, None,
partitionColumns, paths)(sqlContext)
}
+}
- /** Returns a new base relation with the given schema and parameters. */
- override def createRelation(
- sqlContext: SQLContext,
- parameters: Map[String, String],
- schema: StructType): BaseRelation = {
- val path = checkPath(parameters)
- val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
+private[sql] class JSONRelation(
+ val inputRDD: Option[RDD[String]],
+ val samplingRatio: Double,
+ val maybeDataSchema: Option[StructType],
+ val maybePartitionSpec: Option[PartitionSpec],
+ override val userDefinedPartitionColumns: Option[StructType],
+ override val paths: Array[String] = Array.empty[String])(@transient
val sqlContext: SQLContext)
+ extends HadoopFsRelation(maybePartitionSpec) {
- new JSONRelation(path, samplingRatio, Some(schema), sqlContext)
- }
+ override val needConversion: Boolean = false
- override def createRelation(
- sqlContext: SQLContext,
- mode: SaveMode,
- parameters: Map[String, String],
- data: DataFrame): BaseRelation = {
- // check if dataframe satisfies the constraints
- // before moving forward
- checkConstraints(data)
-
- val path = checkPath(parameters)
- val filesystemPath = new Path(path)
- val fs =
filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val doSave = if (fs.exists(filesystemPath)) {
- mode match {
- case SaveMode.Append =>
- sys.error(s"Append mode is not supported by
${this.getClass.getCanonicalName}")
- case SaveMode.Overwrite => {
- JSONRelation.delete(filesystemPath, fs)
- true
- }
- case SaveMode.ErrorIfExists =>
- sys.error(s"path $path already exists.")
- case SaveMode.Ignore => false
- }
- } else {
- true
- }
- if (doSave) {
- // Only save data when the save mode is not ignore.
- data.toJSON.saveAsTextFile(path)
- }
+ private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
+ val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
- createRelation(sqlContext, parameters, data.schema)
- }
-}
+ val paths = inputPaths.map(_.getPath)
-private[sql] class JSONRelation(
- // baseRDD is not immutable with respect to INSERT OVERWRITE
- // and so it must be recreated at least as often as the
- // underlying inputs are modified. To be safe, a function is
- // used instead of a regular RDD value to ensure a fresh RDD is
- // recreated for each and every operation.
- baseRDD: () => RDD[String],
- val path: Option[String],
- val samplingRatio: Double,
- userSpecifiedSchema: Option[StructType])(
- @transient val sqlContext: SQLContext)
- extends BaseRelation
- with TableScan
- with InsertableRelation
- with CatalystScan {
-
- def this(
- path: String,
- samplingRatio: Double,
- userSpecifiedSchema: Option[StructType],
- sqlContext: SQLContext) =
- this(
- () => sqlContext.sparkContext.textFile(path),
- Some(path),
- samplingRatio,
- userSpecifiedSchema)(sqlContext)
-
- /** Constraints to be imposed on dataframe to be stored. */
- private def checkConstraints(data: DataFrame): Unit = {
- if (data.schema.fieldNames.length !=
data.schema.fieldNames.distinct.length) {
- val duplicateColumns =
data.schema.fieldNames.groupBy(identity).collect {
- case (x, ys) if ys.length > 1 => "\"" + x + "\""
- }.mkString(", ")
- throw new AnalysisException(s"Duplicate column(s) :
$duplicateColumns found, " +
- s"cannot save to JSON format")
+ if (paths.nonEmpty) {
+ FileInputFormat.setInputPaths(job, paths: _*)
}
- }
- override val needConversion: Boolean = false
+ sqlContext.sparkContext.hadoopRDD(
+ conf.asInstanceOf[JobConf],
+ classOf[TextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).map(_._2.toString) // get the text line
+ }
- override lazy val schema = userSpecifiedSchema.getOrElse {
+ override lazy val dataSchema = maybeDataSchema.getOrElse {
+ val files = cachedLeafStatuses().filterNot { status =>
+ val name = status.getPath.getName
+ name.startsWith("_") || name.startsWith(".")
+ }.toArray
InferSchema(
- baseRDD(),
+ inputRDD.getOrElse(createBaseRdd(files)),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
}
- override def buildScan(): RDD[Row] = {
- // Rely on type erasure hack to pass RDD[InternalRow] back as RDD[Row]
+ override def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[FileStatus]): RDD[Row] = {
JacksonParser(
- baseRDD(),
- schema,
+ inputRDD.getOrElse(createBaseRdd(inputPaths)),
+ StructType(requiredColumns.map(dataSchema(_))),
sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
}
- override def buildScan(requiredColumns: Seq[Attribute], filters:
Seq[Expression]): RDD[Row] = {
- // Rely on a type erasure hack to pass RDD[InternalRow] back as
RDD[Row]
- JacksonParser(
- baseRDD(),
- StructType.fromAttributes(requiredColumns),
- sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
+ override def equals(other: Any): Boolean = other match {
+ case that: JSONRelation =>
+ (inputRDD match {
+ case Some(rdd) if that.inputRDD.isDefined => rdd eq
that.inputRDD.get
+ case None if that.inputRDD.isEmpty => true
+ case _ => false
+ }) && paths.toSet == that.paths.toSet &&
--- End diff --
This pattern match can be refactored to:
```scala
(this.inputRDD, that.inputRDD) match {
case (Some(thisRDD), Some(thatRDD)) => thisRDD eq thatRDD
case (None, None) => true
case _
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]