Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/14151#discussion_r155143333
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
---
@@ -97,14 +109,26 @@ class TextFileFormat extends TextBasedFileFormat with
DataSourceRegister {
assert(
requiredSchema.length <= 1,
"Text data source only produces a single data column named
\"value\".")
-
+ val textOptions = new TextOptions(options)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+ readToUnsafeMem(broadcastedHadoopConf, requiredSchema,
textOptions.wholeText)
+ }
+
+ private def readToUnsafeMem(conf: Broadcast[SerializableConfiguration],
+ requiredSchema: StructType, wholeTextMode: Boolean):
+ (PartitionedFile) => Iterator[UnsafeRow] = {
+
(file: PartitionedFile) => {
- val reader = new HadoopFileLinesReader(file,
broadcastedHadoopConf.value.value)
+ val confValue = conf.value.value
+ var reader: Iterator[Text] with Closeable = null
+ if (!wholeTextMode) {
+ reader = new HadoopFileLinesReader(file, confValue)
+ } else {
+ reader = new HadoopFileWholeTextReader(file, confValue)
+ }
--- End diff --
We can avoid using `var`:
```scala
val reader = if (!wholeTextMode) {
new HadoopFileLinesReader(file, confValue)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]