Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19224#discussion_r161566402
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
    @@ -143,28 +139,20 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
           sparkSession: SparkSession,
           inputPaths: Seq[FileStatus],
           parsedOptions: JSONOptions): StructType = {
    -    val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, 
inputPaths)
    -    val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, 
parsedOptions)
    -    JsonInferSchema.infer(sampled, parsedOptions, createParser)
    +    val json: RDD[String] = createBaseRdd(sparkSession, inputPaths)
    +    val sampled: RDD[String] = JsonUtils.sample(json, parsedOptions)
    +    JsonInferSchema.infer(sampled, parsedOptions, 
CreateJacksonParser.string)
       }
     
       private def createBaseRdd(
           sparkSession: SparkSession,
    -      inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
    -    val paths = inputPaths.map(_.getPath)
    -    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
    -    val conf = job.getConfiguration
    -    val name = paths.mkString(",")
    -    FileInputFormat.setInputPaths(job, paths: _*)
    -    new BinaryFileRDD(
    -      sparkSession.sparkContext,
    -      classOf[StreamInputFormat],
    -      classOf[String],
    -      classOf[PortableDataStream],
    -      conf,
    -      sparkSession.sparkContext.defaultMinPartitions)
    -      .setName(s"JsonFile: $name")
    -      .values
    +      inputPaths: Seq[FileStatus]): RDD[String] = {
    +    val inputPathsString = inputPaths.map(_.getPath).mkString(",")
    +    val wholeFilesRDD = 
sparkSession.sparkContext.wholeTextFiles(inputPathsString)
    +    wholeFilesRDD.flatMap { fileContent =>
    +      val is = new ByteArrayInputStream(fileContent._2.getBytes)
    +      JacksonParser.splitDocuments(is)
    +    }
       }
     
       private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
    --- End diff --
    
    You can delete this method as from now on CreateJacksonParser.string used 
as factory.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to