Github user attilapiros commented on a diff in the pull request:
https://github.com/apache/spark/pull/19224#discussion_r161704328
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
---
@@ -143,28 +139,20 @@ object MultiLineJsonDataSource extends JsonDataSource
{
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
- val json: RDD[PortableDataStream] = createBaseRdd(sparkSession,
inputPaths)
- val sampled: RDD[PortableDataStream] = JsonUtils.sample(json,
parsedOptions)
- JsonInferSchema.infer(sampled, parsedOptions, createParser)
+ val json: RDD[String] = createBaseRdd(sparkSession, inputPaths)
+ val sampled: RDD[String] = JsonUtils.sample(json, parsedOptions)
+ JsonInferSchema.infer(sampled, parsedOptions,
CreateJacksonParser.string)
}
private def createBaseRdd(
sparkSession: SparkSession,
- inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
- val paths = inputPaths.map(_.getPath)
- val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
- val conf = job.getConfiguration
- val name = paths.mkString(",")
- FileInputFormat.setInputPaths(job, paths: _*)
- new BinaryFileRDD(
- sparkSession.sparkContext,
- classOf[StreamInputFormat],
- classOf[String],
- classOf[PortableDataStream],
- conf,
- sparkSession.sparkContext.defaultMinPartitions)
- .setName(s"JsonFile: $name")
- .values
+ inputPaths: Seq[FileStatus]): RDD[String] = {
+ val inputPathsString = inputPaths.map(_.getPath).mkString(",")
+ val wholeFilesRDD =
sparkSession.sparkContext.wholeTextFiles(inputPathsString)
+ wholeFilesRDD.flatMap { fileContent =>
--- End diff --
nit: rename fileContent to filenameWithContent.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]