Re: Reading CSV with multiLine option invalidates encoding option.
Hi, Thank you for your response. I finally found the cause of this When multiLine option is set, input file is read by UnivocityParser.parseStream() method. This method, in turn, calls convertStream() that initializes tokenizer with tokenizer.beginParsing(inputStream) and parses records using tokenizer.parseNext(). The problem is that beginParsing() method uses UTF-8 as its default char-encoding. As a result, user provided "encoding" option will be ignored. When multiLine option is NOT set, on the other hand, input file is first read and decoded from TextInputCSVDataSource.readFile() method. Then, it is sent to UnivocityParser.parseIterator() method. Therefore, no problem is occurred in in this case. To solve this problem, I removed the call for tokenizer.beginParsing() method in convertStream() since we cannot access options.charset variable here. Then, added it to two places: tokenizeStream() and parseStream() methods. Especially, in parseStream() method, I added charset as the second parameter for beginParsing() method. I attached git diff content as an attachment file. I appreciate any comments on this. Best wishes, Han-Cheol On Wed, Aug 16, 2017 at 3:09 PM, Takeshi Yamamuro wrote: > Hi, > > Since the csv source currently supports ascii-compatible charset, so I > guess shift-jis also works well. > You could check Hyukjin's comment in https://issues.apache.org/ > jira/browse/SPARK-21289 for more info. > > > On Wed, Aug 16, 2017 at 2:54 PM, Han-Cheol Cho wrote: > >> My apologies, >> >> It was a problem of our Hadoop cluster. >> When we tested the same code on another cluster (HDP-based), it worked >> without any problem. >> >> ```scala >> ## make sjis text >> cat a.txt >> 8月データだけでやってみよう >> nkf -W -s a.txt >b.txt >> cat b.txt >> 87n%G!<%?$@$1$G$d$C$F$_$h$& >> nkf -s -w b.txt >> 8月データだけでやってみよう >> hdfs dfs -put a.txt b.txt >> >> ## YARN mode test >> spark.read.option("encoding", "utf-8").csv("a.txt").show(1) >> +--+ >> | _c0| >> +--+ >> |8月データだけでやってみよう| >> +--+ >> >> spark.read.option("encoding", "sjis").csv("b.txt").show(1) >> +--+ >> | _c0| >> +--+ >> |8月データだけでやってみよう| >> +--+ >> >> spark.read.option("encoding", "utf-8").option("multiLine", >> true).csv("a.txt").show(1) >> +--+ >> | _c0| >> +--+ >> |8月データだけでやってみよう| >> +--+ >> >> spark.read.option("encoding", "sjis").option("multiLine", >> true).csv("b.txt").show(1) >> +--+ >> | _c0| >> +--+ >> |8月データだけでやってみよう| >> +--+ >> ``` >> >> I am still digging the root cause and will share it later :-) >> >> Best wishes, >> Han-Choel >> >> >> On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho >> wrote: >> >>> Dear Spark ML members, >>> >>> >>> I experienced a trouble in using "multiLine" option to load CSV data >>> with Shift-JIS encoding. >>> When option("multiLine", true) is specified, option("encoding", >>> "encoding-name") just doesn't work anymore. >>> >>> >>> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() >>> method doesn't use parser.options.charset at all. >>> >>> object MultiLineCSVDataSource extends CSVDataSource { >>> override val isSplitable: Boolean = false >>> >>> override def readFile( >>> conf: Configuration, >>> file: PartitionedFile, >>> parser: UnivocityParser, >>> schema: StructType): Iterator[InternalRow] = { >>> UnivocityParser.parseStream( >>> CodecStreams.createInputStreamWithCloseResource(conf, >>> file.filePath), >>> parser.options.headerFlag, >>> parser, >>> schema) >>> } >>> ... >>> >>> On the other hand, TextInputCSVDataSource.readFile() method uses it: >>> >>> override def readFile( >>> conf: Configuration, >>> file: PartitionedFile, >>> parser: UnivocityParser, >>> schema: StructType): Iterator[InternalRow] = { >>> val lines = { >>> val linesReader = new HadoopFileLinesReader(file, conf) >>&g
Re: Reading CSV with multiLine option invalidates encoding option.
My apologies, It was a problem of our Hadoop cluster. When we tested the same code on another cluster (HDP-based), it worked without any problem. ```scala ## make sjis text cat a.txt 8月データだけでやってみよう nkf -W -s a.txt >b.txt cat b.txt 87n%G!<%?$@$1$G$d$C$F$_$h$& nkf -s -w b.txt 8月データだけでやってみよう hdfs dfs -put a.txt b.txt ## YARN mode test spark.read.option("encoding", "utf-8").csv("a.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "sjis").csv("b.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "utf-8").option("multiLine", true).csv("a.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +--+ spark.read.option("encoding", "sjis").option("multiLine", true).csv("b.txt").show(1) +--+ | _c0| +--+ |8月データだけでやってみよう| +------+ ``` I am still digging the root cause and will share it later :-) Best wishes, Han-Choel On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho wrote: > Dear Spark ML members, > > > I experienced a trouble in using "multiLine" option to load CSV data with > Shift-JIS encoding. > When option("multiLine", true) is specified, option("encoding", > "encoding-name") just doesn't work anymore. > > > In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() > method doesn't use parser.options.charset at all. > > object MultiLineCSVDataSource extends CSVDataSource { > override val isSplitable: Boolean = false > > override def readFile( > conf: Configuration, > file: PartitionedFile, > parser: UnivocityParser, > schema: StructType): Iterator[InternalRow] = { > UnivocityParser.parseStream( > CodecStreams.createInputStreamWithCloseResource(conf, > file.filePath), > parser.options.headerFlag, > parser, > schema) > } > ... > > On the other hand, TextInputCSVDataSource.readFile() method uses it: > > override def readFile( > conf: Configuration, > file: PartitionedFile, > parser: UnivocityParser, > schema: StructType): Iterator[InternalRow] = { > val lines = { > val linesReader = new HadoopFileLinesReader(file, conf) > Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => > linesReader.close())) > linesReader.map { line => > new String(line.getBytes, 0, line.getLength, > parser.options.charset)// < charset option is used here. > } > } > > val shouldDropHeader = parser.options.headerFlag && file.start == 0 > UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema) > } > > > It seems like a bug. > Is there anyone who had the same problem before? > > > Best wishes, > Han-Cheol > > -- > == > Han-Cheol Cho, Ph.D. > Data scientist, Data Science Team, Data Laboratory > NHN Techorus Corp. > > Homepage: https://sites.google.com/site/priancho/ > == > -- == Han-Cheol Cho, Ph.D. Data scientist, Data Science Team, Data Laboratory NHN Techorus Corp. Homepage: https://sites.google.com/site/priancho/ ==
Reading CSV with multiLine option invalidates encoding option.
Dear Spark ML members, I experienced a trouble in using "multiLine" option to load CSV data with Shift-JIS encoding. When option("multiLine", true) is specified, option("encoding", "encoding-name") just doesn't work anymore. In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile() method doesn't use parser.options.charset at all. object MultiLineCSVDataSource extends CSVDataSource { override val isSplitable: Boolean = false override def readFile( conf: Configuration, file: PartitionedFile, parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { UnivocityParser.parseStream( CodecStreams.createInputStreamWithCloseResource(conf, file.filePath), parser.options.headerFlag, parser, schema) } ... On the other hand, TextInputCSVDataSource.readFile() method uses it: override def readFile( conf: Configuration, file: PartitionedFile, parser: UnivocityParser, schema: StructType): Iterator[InternalRow] = { val lines = { val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) linesReader.map { line => new String(line.getBytes, 0, line.getLength, parser.options.charset)// < charset option is used here. } } val shouldDropHeader = parser.options.headerFlag && file.start == 0 UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema) } It seems like a bug. Is there anyone who had the same problem before? Best wishes, Han-Cheol -- == Han-Cheol Cho, Ph.D. Data scientist, Data Science Team, Data Laboratory NHN Techorus Corp. Homepage: https://sites.google.com/site/priancho/ ==
strange usage of tempfile.mkdtemp() in PySpark mllib.recommendation doctest
Dear Spark user mailinglist members, In PySpark's mllib.recommendation doctest, I found a bit strange usage of temporary directory creation function, tempfile.mkdtemp(), in the following part. # https://github.com/apache/spark/blob/master/python/pyspark/mllib/recommendation.py ... >>> import os, tempfile >>> path = tempfile.mkdtemp() >>> model.save(sc, path) >>> sameModel = MatrixFactorizationModel.load(sc, path) >>> sameModel.predict(2, 2) 0.4... >>> sameModel.predictAll(testset).collect() [Rating(... >>> from shutil import rmtree >>> try: ... rmtree(path) ... except OSError: ... pass As I understand, calling tempfile.mkdtemp() function creates a temporary directory in LOCAL machine. However, model.save(sc, path) saves the model data in HDFS. After all, the doctest removes only LOCAL temp directory using shutil.rmtree(). Shouldn't we delete the temporary directory in HDFS too? Best wishes, HanCheol Han-Cheol Cho Data Laboratory / Data Scientist 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階 Email hancheol@nhn-techorus.com
A question about inconsistency during dataframe creation with RDD/dict in PySpark
Dear spark user ml members, I have quite messy input data so it is difficult to load them as a dataframe object directly. What I did is to load it as an RDD of strings first, convert it to an RDD of pyspark.sql.Row objects, then use toDF method as below. mydf = myrdd.map(parse).toDF() I didn't expect any problem from this very simple code at first. But, when I tested it with a bunch of data, I found that this approach fails with the following exception. java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 10 fields are required while 9 values are provided. at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147) at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665) at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) ... This exception comes from the fact that some Row objects in RDD have missing fields. For example, the following example fails with the same exception d1 = [Row(k1="value1.1", k2="value1.2")] d2 = [Row(k1="value2.1")] rdd1 = spark.sparkContext.parallelize(d1) rdd2 = spark.sparkContext.parallelize(d2) urdd = rdd1.union(rdd2) urdd.collect() [Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')] urdd.toDF() DataFrame[k1: string, k2: string] urdd.toDF().show() --> fail with the same exception While digging into the code, I found that Row object raises an exception if it does not have a given key as follows. # spark/python/pyspark/sql/types.py def _verify_type(obj, dataType, nullable=True): ... elif isinstance(dataType, StructType): ... elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): # the order in obj could be different than dataType.fields for f in dataType.fields: _verify_type(obj[f.name], f.dataType, f.nullable) --> obj[f.name] raise ValueError(item) exception if the key does not exist. I think that raising an exception in this situation is a reasonable approach. However, if I use an RDD of dict objects, instead of Row objects, the convert process succeed as follows by filling missing columns with null values. dict1 = [{"k1":"v1.1", "k2":"v1.2"}] dict2 = [{"k1":"v2.1"}] rdd1 = spark.sparkContext.parallelize(dict1) rdd2 = spark.sparkContext.parallelize(dict2) rdd1.collect() [{'k2': 'v1.2', 'k1': 'v1.1'}] rdd2.collect() [{'k1': 'v2.1'}] urdd = rdd1.union(rdd2) urdd.collect() [{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}] spark.createDataFrame(urdd).show() +++ | k1| k2| +++ |v1.1|v1.2| |v2.1|null| +++ urdd.toDF().show() +++ | k1| k2| +++ |v1.1|v1.2| |v2.1|null| +++ I am wonder whether this difference is an expected result or not. Best wishes, Han-cheol Han-Cheol Cho Data Laboratory / Data Scientist 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階 Email hancheol@nhn-techorus.com
null values returned by max() over a window function
Hello, I am trying to test Spark's SQL window functions in the following blog, https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html, and facing a problem as follows:# testing rowsBetween()winSpec2 = window.Window.partitionBy(data["category"]).orderBy(data["revenue"]).rowsBetween(2,2)tmp4 = functions.max(data["revenue"]).over(winSpec2)data.select(["product","category","revenue", tmp4.alias("rowbetween2and2")]).orderBy(["category","revenue"]).show()+--+--+---+---+ product category revenue rowbetween2and2+--+--+---+---+ BendableCell phone 3000 5000 FoldableCell phone 3000 6000Ultra thinCell phone 5000 6000 ThinCellphone 6000 null --> ??? Very thinCell phone 6000 nullNormal Tablet 1500 4500 Big Tablet 2500 5500 Pro Tablet 4500 6500 Mini Tablet 5500 null Pro2 Tablet 6500 null+--+--+---+---+As you can see, the last column calculates the max value among the current row,left two rows and right two rows partitioned by category row.However, the result for the last two rows in each category partition is null.Is there something that I missed or is this a bug? Han-Cheol Cho Data Laboratory / Data Scientist 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階 Email hancheol@nhn-techorus.com