Re: Reading CSV with multiLine option invalidates encoding option.

2017-08-17 Thread Han-Cheol Cho
Hi,

Thank you for your response.
I finally found the cause of this


When multiLine option is set, input file is read by
UnivocityParser.parseStream() method.
This method, in turn, calls convertStream() that initializes tokenizer with
tokenizer.beginParsing(inputStream) and parses records using
tokenizer.parseNext().

The problem is that beginParsing() method uses UTF-8 as its default
char-encoding.
As a result, user provided "encoding" option will be ignored.


When multiLine option is NOT set, on the other hand, input file is first
read and decoded from TextInputCSVDataSource.readFile() method.
Then, it is sent to UnivocityParser.parseIterator() method.
Therefore, no problem is occurred in in this case.


To solve this problem, I removed the call for tokenizer.beginParsing()
method in convertStream() since we cannot access options.charset variable
here.
Then, added it to two places: tokenizeStream() and parseStream() methods.
Especially, in parseStream() method, I added charset as the second
parameter for beginParsing() method.

I attached git diff content as an attachment file.
I appreciate any comments on this.


Best wishes,
Han-Cheol




On Wed, Aug 16, 2017 at 3:09 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Since the csv source currently supports ascii-compatible charset, so I
> guess shift-jis also works well.
> You could check Hyukjin's comment in https://issues.apache.org/
> jira/browse/SPARK-21289 for more info.
>
>
> On Wed, Aug 16, 2017 at 2:54 PM, Han-Cheol Cho  wrote:
>
>> My apologies,
>>
>> It was a problem of our Hadoop cluster.
>> When we tested the same code on another cluster (HDP-based), it worked
>> without any problem.
>>
>> ```scala
>> ## make sjis text
>> cat a.txt
>> 8月データだけでやってみよう
>> nkf -W -s a.txt >b.txt
>> cat b.txt
>> 87n%G!<%?$@$1$G$d$C$F$_$h$&
>> nkf -s -w b.txt
>> 8月データだけでやってみよう
>> hdfs dfs -put a.txt b.txt
>>
>> ## YARN mode test
>> spark.read.option("encoding", "utf-8").csv("a.txt").show(1)
>> +--+
>> |   _c0|
>> +--+
>> |8月データだけでやってみよう|
>> +--+
>>
>> spark.read.option("encoding", "sjis").csv("b.txt").show(1)
>> +--+
>> |   _c0|
>> +--+
>> |8月データだけでやってみよう|
>> +--+
>>
>> spark.read.option("encoding", "utf-8").option("multiLine",
>> true).csv("a.txt").show(1)
>> +--+
>> |   _c0|
>> +--+
>> |8月データだけでやってみよう|
>> +--+
>>
>> spark.read.option("encoding", "sjis").option("multiLine",
>> true).csv("b.txt").show(1)
>> +--+
>> |   _c0|
>> +--+
>> |8月データだけでやってみよう|
>> +--+
>> ```
>>
>> I am still digging the root cause and will share it later :-)
>>
>> Best wishes,
>> Han-Choel
>>
>>
>> On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho 
>> wrote:
>>
>>> Dear Spark ML members,
>>>
>>>
>>> I experienced a trouble in using "multiLine" option to load CSV data
>>> with Shift-JIS encoding.
>>> When option("multiLine", true) is specified, option("encoding",
>>> "encoding-name") just doesn't work anymore.
>>>
>>>
>>> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
>>> method doesn't use parser.options.charset at all.
>>>
>>> object MultiLineCSVDataSource extends CSVDataSource {
>>>   override val isSplitable: Boolean = false
>>>
>>>   override def readFile(
>>>   conf: Configuration,
>>>   file: PartitionedFile,
>>>   parser: UnivocityParser,
>>>   schema: StructType): Iterator[InternalRow] = {
>>> UnivocityParser.parseStream(
>>>   CodecStreams.createInputStreamWithCloseResource(conf,
>>> file.filePath),
>>>   parser.options.headerFlag,
>>>   parser,
>>>   schema)
>>>   }
>>>   ...
>>>
>>> On the other hand, TextInputCSVDataSource.readFile() method uses it:
>>>
>>>   override def readFile(
>>>   conf: Configuration,
>>>   file: PartitionedFile,
>>>   parser: UnivocityParser,
>>>   schema: StructType): Iterator[InternalRow] = {
>>> val lines = {
>>>   val linesReader = new HadoopFileLinesReader(file, conf)
>>&g

Re: Reading CSV with multiLine option invalidates encoding option.

2017-08-15 Thread Han-Cheol Cho
My apologies,

It was a problem of our Hadoop cluster.
When we tested the same code on another cluster (HDP-based), it worked
without any problem.

```scala
## make sjis text
cat a.txt
8月データだけでやってみよう
nkf -W -s a.txt >b.txt
cat b.txt
87n%G!<%?$@$1$G$d$C$F$_$h$&
nkf -s -w b.txt
8月データだけでやってみよう
hdfs dfs -put a.txt b.txt

## YARN mode test
spark.read.option("encoding", "utf-8").csv("a.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "sjis").csv("b.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "utf-8").option("multiLine",
true).csv("a.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+--+

spark.read.option("encoding", "sjis").option("multiLine",
true).csv("b.txt").show(1)
+--+
|   _c0|
+--+
|8月データだけでやってみよう|
+------+
```

I am still digging the root cause and will share it later :-)

Best wishes,
Han-Choel


On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho  wrote:

> Dear Spark ML members,
>
>
> I experienced a trouble in using "multiLine" option to load CSV data with
> Shift-JIS encoding.
> When option("multiLine", true) is specified, option("encoding",
> "encoding-name") just doesn't work anymore.
>
>
> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
> method doesn't use parser.options.charset at all.
>
> object MultiLineCSVDataSource extends CSVDataSource {
>   override val isSplitable: Boolean = false
>
>   override def readFile(
>   conf: Configuration,
>   file: PartitionedFile,
>   parser: UnivocityParser,
>   schema: StructType): Iterator[InternalRow] = {
> UnivocityParser.parseStream(
>   CodecStreams.createInputStreamWithCloseResource(conf,
> file.filePath),
>   parser.options.headerFlag,
>   parser,
>   schema)
>   }
>   ...
>
> On the other hand, TextInputCSVDataSource.readFile() method uses it:
>
>   override def readFile(
>   conf: Configuration,
>   file: PartitionedFile,
>   parser: UnivocityParser,
>   schema: StructType): Iterator[InternalRow] = {
> val lines = {
>   val linesReader = new HadoopFileLinesReader(file, conf)
>   Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
> linesReader.close()))
>   linesReader.map { line =>
> new String(line.getBytes, 0, line.getLength,
> parser.options.charset)// < charset option is used here.
>   }
> }
>
> val shouldDropHeader = parser.options.headerFlag && file.start == 0
> UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
>   }
>
>
> It seems like a bug.
> Is there anyone who had the same problem before?
>
>
> Best wishes,
> Han-Cheol
>
> --
> ==
> Han-Cheol Cho, Ph.D.
> Data scientist, Data Science Team, Data Laboratory
> NHN Techorus Corp.
>
> Homepage: https://sites.google.com/site/priancho/
> ==
>



-- 
==
Han-Cheol Cho, Ph.D.
Data scientist, Data Science Team, Data Laboratory
NHN Techorus Corp.

Homepage: https://sites.google.com/site/priancho/
==


Reading CSV with multiLine option invalidates encoding option.

2017-08-15 Thread Han-Cheol Cho
Dear Spark ML members,


I experienced a trouble in using "multiLine" option to load CSV data with
Shift-JIS encoding.
When option("multiLine", true) is specified, option("encoding",
"encoding-name") just doesn't work anymore.


In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
method doesn't use parser.options.charset at all.

object MultiLineCSVDataSource extends CSVDataSource {
  override val isSplitable: Boolean = false

  override def readFile(
  conf: Configuration,
  file: PartitionedFile,
  parser: UnivocityParser,
  schema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
  CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
  parser.options.headerFlag,
  parser,
  schema)
  }
  ...

On the other hand, TextInputCSVDataSource.readFile() method uses it:

  override def readFile(
  conf: Configuration,
  file: PartitionedFile,
  parser: UnivocityParser,
  schema: StructType): Iterator[InternalRow] = {
val lines = {
  val linesReader = new HadoopFileLinesReader(file, conf)
  Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
linesReader.close()))
  linesReader.map { line =>
new String(line.getBytes, 0, line.getLength,
parser.options.charset)// < charset option is used here.
  }
}

val shouldDropHeader = parser.options.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
  }


It seems like a bug.
Is there anyone who had the same problem before?


Best wishes,
Han-Cheol

-- 
==
Han-Cheol Cho, Ph.D.
Data scientist, Data Science Team, Data Laboratory
NHN Techorus Corp.

Homepage: https://sites.google.com/site/priancho/
==


strange usage of tempfile.mkdtemp() in PySpark mllib.recommendation doctest

2017-03-02 Thread Han-Cheol Cho
Dear Spark user mailinglist members,


In PySpark's mllib.recommendation doctest, I found a bit strange usage of 
temporary directory creation function, tempfile.mkdtemp(), in the following
part.
# 
https://github.com/apache/spark/blob/master/python/pyspark/mllib/recommendation.py

...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = MatrixFactorizationModel.load(sc, path)
>>> sameModel.predict(2, 2)
0.4...
>>> sameModel.predictAll(testset).collect()
[Rating(...
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass

As I understand, calling tempfile.mkdtemp() function creates a temporary 
directory in LOCAL machine.
However, model.save(sc, path) saves the model data in HDFS.
After all, the doctest removes only LOCAL temp directory using shutil.rmtree().
Shouldn't we delete the temporary directory in HDFS too?


Best wishes,
HanCheol







 Han-Cheol Cho  Data Laboratory   / Data Scientist  〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階
Email  hancheol@nhn-techorus.com 



 

A question about inconsistency during dataframe creation with RDD/dict in PySpark

2017-02-01 Thread Han-Cheol Cho
Dear spark user ml members,


I have quite messy input data so it is difficult to load them as a dataframe 
object
directly.
What I did is to load it as an RDD of strings first, convert it to an RDD of 
pyspark.sql.Row objects, then use toDF method as below.
mydf = myrdd.map(parse).toDF()

I didn't expect any problem from this very simple code at first.


But, when I tested it with a bunch of data, I found that this approach fails 
with the
following exception.
java.lang.IllegalStateException: Input row doesn't have expected number of 
values required by the schema. 10 fields are required while 9 values are 
provided.   
  
at 
org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147)
at 
org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at 
org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   
...

This exception comes from the fact that some Row objects in RDD have missing 
fields.
For example, the following example fails with the same exception
d1 = [Row(k1="value1.1", k2="value1.2")]
  
d2 = [Row(k1="value2.1")]   
  

rdd1 = spark.sparkContext.parallelize(d1)   
 
rdd2 = spark.sparkContext.parallelize(d2)   
 

urdd = rdd1.union(rdd2) 
urdd.collect()  
[Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')] 
 

urdd.toDF() 
 
DataFrame[k1: string, k2: string]   
   
urdd.toDF().show()   
--> fail with the same exception

While digging into the code, I found that Row object raises an exception if
it does not have a given key as follows.
# spark/python/pyspark/sql/types.py
def _verify_type(obj, dataType, nullable=True):
...
elif isinstance(dataType, StructType):
...
elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
# the order in obj could be different than dataType.fields
for f in dataType.fields:
_verify_type(obj[f.name], f.dataType, f.nullable)
   --> obj[f.name] raise ValueError(item) exception if the 
key does not exist.


I think that raising an exception in this situation is a reasonable approach.
However, if I use an RDD of dict objects, instead of Row objects, the convert 
process
succeed as follows by filling missing columns with null values.
dict1 = [{"k1":"v1.1", "k2":"v1.2"}]
dict2 = [{"k1":"v2.1"}]

rdd1 = spark.sparkContext.parallelize(dict1)
rdd2 = spark.sparkContext.parallelize(dict2)
rdd1.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}]
rdd2.collect()
[{'k1': 'v2.1'}]

urdd = rdd1.union(rdd2)
urdd.collect()
[{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}]

spark.createDataFrame(urdd).show()
+++
|  k1|  k2|
+++
|v1.1|v1.2|
|v2.1|null|
    +++

urdd.toDF().show()
+++
|  k1|  k2|
+++
|v1.1|v1.2|
|v2.1|null|
+++


I am wonder whether this difference is an expected result or not.


Best wishes,
Han-cheol



 Han-Cheol Cho  Data Laboratory   / Data Scientist  〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階
Email  hancheol@nhn-techorus.com 



 

null values returned by max() over a window function

2016-11-28 Thread Han-Cheol Cho
Hello, I am trying to test Spark's SQL window functions in the following blog,  
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html,
 and facing a problem as follows:# testing rowsBetween()winSpec2 = 
window.Window.partitionBy(data["category"]).orderBy(data["revenue"]).rowsBetween(2,2)tmp4
 = 
functions.max(data["revenue"]).over(winSpec2)data.select(["product","category","revenue",
 
tmp4.alias("rowbetween2and2")]).orderBy(["category","revenue"]).show()+--+--+---+---+
product  category   revenue
rowbetween2and2+--+--+---+---+  BendableCell 
phone   3000   5000  FoldableCell   phone   3000   6000Ultra 
thinCell   phone   5000   6000  ThinCellphone   6000   
null --> ??? Very thinCell phone   6000   nullNormal  Tablet 
  1500   4500   Big Tablet   2500   5500   Pro  
   Tablet   4500   6500  Mini Tablet   5500   
null  Pro2 Tablet   6500   
null+--+--+---+---+As you can see, the last 
column calculates the max value among the current row,left two rows and right 
two rows partitioned by category row.However, the result for the last two rows 
in each category partition is null.Is there something that I missed or is this 
a bug? 
 
 Han-Cheol Cho  Data Laboratory   / Data Scientist  〒160-0022 東京都新宿区新宿6-27-30 
新宿イーストサイドスクエア13階
Email  hancheol@nhn-techorus.com