Repository: spark
Updated Branches:
  refs/heads/master 63b200e8d -> 83775bc78


[SPARK-14158][SQL] implement buildReader for json data source

## What changes were proposed in this pull request?

This PR implements buildReader for json data source and enable it in the new 
data source code path.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #11960 from cloud-fan/json.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83775bc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83775bc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83775bc7

Branch: refs/heads/master
Commit: 83775bc78e183791f75a99cdfbcd68a67ca0d472
Parents: 63b200e
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue Mar 29 14:34:12 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Mar 29 14:34:12 2016 +0800

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        |  4 +-
 .../datasources/HadoopFileLinesReader.scala     | 51 ++++++++++++++++++++
 .../datasources/json/JSONRelation.scala         | 37 +++++++++++++-
 .../datasources/json/JacksonParser.scala        |  2 +-
 4 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83775bc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4b04fec..76a724e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -58,7 +58,8 @@ private[sql] object FileSourceStrategy extends Strategy with 
Logging {
     case PhysicalOperation(projects, filters, l @ LogicalRelation(files: 
HadoopFsRelation, _, _))
       if (files.fileFormat.toString == "TestFileFormat" ||
          files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
-         files.fileFormat.toString == "ORC") &&
+         files.fileFormat.toString == "ORC" ||
+         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
          files.sqlContext.conf.parquetFileScan =>
       // Filters on this relation fall into four categories based on where we 
can use them to avoid
       // reading unneeded data:
@@ -138,7 +139,6 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
           val splitFiles = selectedPartitions.flatMap { partition =>
             partition.files.flatMap { file =>
-              assert(file.getLen != 0, file.toString)
               (0L to file.getLen by maxSplitBytes).map { offset =>
                 val remaining = file.getLen - offset
                 val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining

http://git-wip-us.apache.org/repos/asf/spark/blob/83775bc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
new file mode 100644
index 0000000..18f9b55
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+/**
+ * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which 
are all of the lines
+ * in that file.
+ */
+class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) 
extends Iterator[Text] {
+  private val iterator = {
+    val fileSplit = new FileSplit(
+      new Path(new URI(file.filePath)),
+      file.start,
+      file.length,
+      // TODO: Implement Locality
+      Array.empty)
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+    val reader = new LineRecordReader()
+    reader.initialize(fileSplit, hadoopAttemptContext)
+    new RecordReaderIterator(reader)
+  }
+
+  override def hasNext: Boolean = iterator.hasNext
+
+  override def next(): Text = iterator.next()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/83775bc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3bf0af0..21fc122 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.CharArrayWriter
 
 import com.fasterxml.jackson.core.JsonFactory
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -32,7 +33,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -120,6 +122,39 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
     }
   }
 
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] 
= {
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    val parsedOptions: JSONOptions = new JSONOptions(options)
+    val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
+      .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
+
+    val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+    val joinedRow = new JoinedRow()
+
+    file => {
+      val lines = new HadoopFileLinesReader(file, 
broadcastedConf.value.value).map(_.toString)
+
+      val rows = JacksonParser.parseJson(
+        lines,
+        dataSchema,
+        columnNameOfCorruptRecord,
+        parsedOptions)
+
+      val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+      rows.map { row =>
+        appendPartitionColumns(joinedRow(row, file.partitionValues))
+      }
+    }
+  }
+
   private def createBaseRdd(sqlContext: SQLContext, inputPaths: 
Seq[FileStatus]): RDD[String] = {
     val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
     val conf = job.getConfiguration

http://git-wip-us.apache.org/repos/asf/spark/blob/83775bc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 00c14ad..8bc53ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -250,7 +250,7 @@ object JacksonParser extends Logging {
     new GenericArrayData(values.toArray)
   }
 
-  private def parseJson(
+  def parseJson(
       input: Iterator[String],
       schema: StructType,
       columnNameOfCorruptRecords: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to