Re: PySpark error java.lang.IllegalArgumentException

2023-07-10 Thread elango vaidyanathan
Finally I was able to solve this issue by setting this conf.
"spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_
folder"

Thanks all!



On Sat, 8 Jul 2023 at 3:45 AM, Brian Huynh  wrote:

> Hi Khalid,
>
> Elango mentioned the file is working fine in our another environment with
> the same driver and executor memory
>
> Brian
>
> On Jul 7, 2023, at 10:18 AM, Khalid Mammadov 
> wrote:
>
> 
>
> Perhaps that parquet file is corrupted or got that is in that folder?
> To check, try to read that file with pandas or other tools to see if you
> can read without Spark.
>
> On Wed, 5 Jul 2023, 07:25 elango vaidyanathan, 
> wrote:
>
>>
>> Hi team,
>>
>> Any updates on this below issue
>>
>> On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
>> wrote:
>>
>>>
>>>
>>> Hi all,
>>>
>>> I am reading a parquet file like this and it gives 
>>> java.lang.IllegalArgumentException.
>>> However i can work with other parquet files (such as nyc taxi parquet
>>> files) without any issue. I have copied the full error log as well. Can you
>>> please check once and let me know how to fix this?
>>>
>>> import pyspark
>>>
>>> from pyspark.sql import SparkSession
>>>
>>> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
>>> "20g").config("spark.driver.memory", "50g").getOrCreate()
>>>
>>> df=spark.read.parquet("/data/202301/account_cycle")
>>>
>>> df.printSchema() # worksfine
>>>
>>> df.count() #worksfine
>>>
>>> df.show()# getting below error
>>>
>>> >>> df.show()
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>>>
>>> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
>>> struct>> account_status: string, currency_code: string, opened_dt: date ... 30 more
>>> fields>
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values
>>> in memory (estimated size 540.6 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
>>> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
>>> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from
>>> showString at NativeMethodAccessorImpl.java:0
>>>
>>> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin
>>> packing, max size: 134217728 bytes, open cost is considered as scanning
>>> 4194304 bytes.
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
>>> NativeMethodAccessorImpl.java:0
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
>>> NativeMethodAccessorImpl.java:0) with 1 output partitions
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
>>> (showString at NativeMethodAccessorImpl.java:0)
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
>>> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
>>> which has no missing parents
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values
>>> in memory (estimated size 38.1 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
>>> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
>>> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>>>
>>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
>>> at DAGScheduler.scala:1478
>>>
>>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
>>> ResultStage 14 (MapPartitionsRDD[42] at showString at
>>> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
>>> Vecto

Re: PySpark error java.lang.IllegalArgumentException

2023-07-05 Thread elango vaidyanathan
Hi team,

Any updates on this below issue

On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan 
wrote:

>
>
> Hi all,
>
> I am reading a parquet file like this and it gives 
> java.lang.IllegalArgumentException.
> However i can work with other parquet files (such as nyc taxi parquet
> files) without any issue. I have copied the full error log as well. Can you
> please check once and let me know how to fix this?
>
> import pyspark
>
> from pyspark.sql import SparkSession
>
> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
> "20g").config("spark.driver.memory", "50g").getOrCreate()
>
> df=spark.read.parquet("/data/202301/account_cycle")
>
> df.printSchema() # worksfine
>
> df.count() #worksfine
>
> df.show()# getting below error
>
> >>> df.show()
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:
>
> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
> struct account_status: string, currency_code: string, opened_dt: date ... 30 more
> fields>
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in
> memory (estimated size 540.6 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)
>
> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
> at NativeMethodAccessorImpl.java:0
>
> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing,
> max size: 134217728 bytes, open cost is considered as scanning 4194304
> bytes.
>
> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
> NativeMethodAccessorImpl.java:0
>
> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
> NativeMethodAccessorImpl.java:0) with 1 output partitions
>
> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
> (showString at NativeMethodAccessorImpl.java:0)
>
> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()
>
> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()
>
> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
> which has no missing parents
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in
> memory (estimated size 38.1 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)
>
> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)
>
> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast
> at DAGScheduler.scala:1478
>
> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 14 (MapPartitionsRDD[42] at showString at
> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
> Vector(0))
>
> 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1
> tasks resource profile 0
>
> 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0
> (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
> taskResourceAssignments Map()
>
> 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)
>
> 23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
> file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
> 0-134217728, partition values: [empty row]
>
> 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 48)
>
> java.lang.IllegalArgumentException
>
> at java.nio.Buffer.limit(Buffer.java:275)
>
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)
>
> at
> org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)
>
> at
> org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
>
> at java.io.DataInputStream.readFully(DataInputStream.java:195)
>
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>
> at
> org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)
>
> at
> org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)
>
> at
> org.apache.parquet.bytes.BytesInput.toInputStream(Byte

PySpark error java.lang.IllegalArgumentException

2023-07-03 Thread elango vaidyanathan
Hi all,

I am reading a parquet file like this and it gives
java.lang.IllegalArgumentException.
However i can work with other parquet files (such as nyc taxi parquet
files) without any issue. I have copied the full error log as well. Can you
please check once and let me know how to fix this?

import pyspark

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory",
"20g").config("spark.driver.memory", "50g").getOrCreate()

df=spark.read.parquet("/data/202301/account_cycle")

df.printSchema() # worksfine

df.count() #worksfine

df.show()# getting below error

>>> df.show()

23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters:

23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema:
struct

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in
memory (estimated size 540.6 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as
bytes in memory (estimated size 46.0 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in
memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString
at NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing,
max size: 134217728 bytes, open cost is considered as scanning 4194304
bytes.

23/07/03 18:07:20 INFO SparkContext: Starting job: showString at
NativeMethodAccessorImpl.java:0

23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at
NativeMethodAccessorImpl.java:0) with 1 output partitions

23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14
(showString at NativeMethodAccessorImpl.java:0)

23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List()

23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List()

23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14
(MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0),
which has no missing parents

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in
memory (estimated size 38.1 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as
bytes in memory (estimated size 10.5 KiB, free 26.5 GiB)

23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in
memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB)

23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast at
DAGScheduler.scala:1478

23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from
ResultStage 14 (MapPartitionsRDD[42] at showString at
NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions
Vector(0))

23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks
resource profile 0

23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID
48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes)
taskResourceAssignments Map()

23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48)

23/07/03 18:07:20 INFO FileScanRDD: Reading File path:
file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range:
0-134217728, partition values: [empty row]

23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
48)

java.lang.IllegalArgumentException

at java.nio.Buffer.limit(Buffer.java:275)

at org.xerial.snappy.Snappy.uncompress(Snappy.java:553)

at
org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71)

at
org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)

at java.io.DataInputStream.readFully(DataInputStream.java:195)

at java.io.DataInputStream.readFully(DataInputStream.java:169)

at
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286)

at
org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237)

at
org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246)

at
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154)

at
org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96)

at
org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.(VectorizedColumnReader.java:114)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:352)

at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)

at

Single node spark issue in Sparkly/RStudio

2023-03-16 Thread elango vaidyanathan
Hi team,

In a single Linux node, I would like to set up Rstudio with Sparkly. Three
to four people make up the dev team.
I am aware of the single-node spark cluster's constraints. When there is a
resource problem with Spark, I want to know when more users join in to use
Sparkly in Rstudio. It should simply retain the new jobs in the queue
rather than crashing.
I think this is not only specific to Rstudio/SparklyR. Even applicable for
Spark/Pyspark with a single node cluster.
Please share the optimal method for allocating Spark's resources in this
scenario.


Thanks,
Elango


Re: CSV parsing issue

2020-05-29 Thread elango vaidyanathan
Thanks Sean, got it.

Thanks,
Elango

On Thu, May 28, 2020, 9:04 PM Sean Owen  wrote:

> I don't think so, that data is inherently ambiguous and incorrectly
> formatted. If you know something about the structure, maybe you can rewrite
> the middle column manually to escape the inner quotes and then reparse.
>
> On Thu, May 28, 2020 at 10:25 AM elango vaidyanathan 
> wrote:
>
>> Is there any way I can handle it in code?
>>
>> Thanks,
>> Elango
>>
>> On Thu, May 28, 2020, 8:52 PM Sean Owen  wrote:
>>
>>> Your data doesn't escape double-quotes.
>>>
>>> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan <
>>> elango...@gmail.com> wrote:
>>>
>>>>
>>>> Hi team,
>>>>
>>>> I am loading an CSV. One column contains a json value. I am unable to
>>>> parse that column properly. Below is the details. Can you please check 
>>>> once?
>>>>
>>>>
>>>>
>>>> val df1=spark.read.option("inferSchema","true").
>>>> option("header","true").option("quote", "\"")
>>>>
>>>> .option("escape",
>>>> "\"").csv("/FileStore/tables/sample_file_structure.csv")
>>>>
>>>>
>>>>
>>>> sample data:
>>>>
>>>> 
>>>>
>>>> column1,column2,column3
>>>>
>>>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>>>> "abcdef",   "language" : "en" }",11
>>>>
>>>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>>>> "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
>>>> "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
>>>> }",13
>>>>
>>>>
>>>>
>>>> output:
>>>>
>>>> ---
>>>>
>>>> +-++---+
>>>>
>>>> | column1| column2| column3 |
>>>>
>>>> +-++---+
>>>>
>>>> |123456789|"{ "moveId" : "...| "dob" : null|
>>>>
>>>> |123456789|"{ "moveId" : "...| "dob" : null|
>>>>
>>>> +-++---+
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Elango
>>>>
>>>


Re: CSV parsing issue

2020-05-28 Thread elango vaidyanathan
Is there any way I can handle it in code?

Thanks,
Elango

On Thu, May 28, 2020, 8:52 PM Sean Owen  wrote:

> Your data doesn't escape double-quotes.
>
> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan 
> wrote:
>
>>
>> Hi team,
>>
>> I am loading an CSV. One column contains a json value. I am unable to
>> parse that column properly. Below is the details. Can you please check once?
>>
>>
>>
>> val df1=spark.read.option("inferSchema","true").
>> option("header","true").option("quote", "\"")
>>
>> .option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv")
>>
>>
>>
>> sample data:
>>
>> 
>>
>> column1,column2,column3
>>
>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>> "abcdef",   "language" : "en" }",11
>>
>> 123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
>> "ghi, jkl",   "language" : "en" }",12 123456789,"{   "moveId" :
>> "123456789",   "dob" : null,   "username" : "mno, pqr",   "language" : "en"
>> }",13
>>
>>
>>
>> output:
>>
>> ---
>>
>> +-++---+
>>
>> | column1| column2| column3 |
>>
>> +-++---+
>>
>> |123456789|"{ "moveId" : "...| "dob" : null|
>>
>> |123456789|"{ "moveId" : "...| "dob" : null|
>>
>> +-++---+
>>
>>
>>
>> Thanks,
>> Elango
>>
>


CSV parsing issue

2020-05-28 Thread elango vaidyanathan
Hi team,

I am loading an CSV. One column contains a json value. I am unable to parse
that column properly. Below is the details. Can you please check once?



val df1=spark.read.option("inferSchema","true").
option("header","true").option("quote", "\"")

.option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv")



sample data:



column1,column2,column3

123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" :
"abcdef",   "language" : "en" }",11

123456789,"{   "moveId" : "123456789",   "dob" : null,   "username" : "ghi,
jkl",   "language" : "en" }",12 123456789,"{   "moveId" : "123456789",
"dob" : null,   "username" : "mno, pqr",   "language" : "en" }",13



output:

---

+-++---+

| column1| column2| column3 |

+-++---+

|123456789|"{ "moveId" : "...| "dob" : null|

|123456789|"{ "moveId" : "...| "dob" : null|

+-++---+



Thanks,
Elango