Re: PySpark error java.lang.IllegalArgumentException
Finally I was able to solve this issue by setting this conf. "spark.driver.extraJavaOptions=-Dorg.xerial.snappy.tempdir=/my_user/temp_ folder" Thanks all! On Sat, 8 Jul 2023 at 3:45 AM, Brian Huynh wrote: > Hi Khalid, > > Elango mentioned the file is working fine in our another environment with > the same driver and executor memory > > Brian > > On Jul 7, 2023, at 10:18 AM, Khalid Mammadov > wrote: > > > > Perhaps that parquet file is corrupted or got that is in that folder? > To check, try to read that file with pandas or other tools to see if you > can read without Spark. > > On Wed, 5 Jul 2023, 07:25 elango vaidyanathan, > wrote: > >> >> Hi team, >> >> Any updates on this below issue >> >> On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan >> wrote: >> >>> >>> >>> Hi all, >>> >>> I am reading a parquet file like this and it gives >>> java.lang.IllegalArgumentException. >>> However i can work with other parquet files (such as nyc taxi parquet >>> files) without any issue. I have copied the full error log as well. Can you >>> please check once and let me know how to fix this? >>> >>> import pyspark >>> >>> from pyspark.sql import SparkSession >>> >>> spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory", >>> "20g").config("spark.driver.memory", "50g").getOrCreate() >>> >>> df=spark.read.parquet("/data/202301/account_cycle") >>> >>> df.printSchema() # worksfine >>> >>> df.count() #worksfine >>> >>> df.show()# getting below error >>> >>> >>> df.show() >>> >>> 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters: >>> >>> 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters: >>> >>> 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema: >>> struct>> account_status: string, currency_code: string, opened_dt: date ... 30 more >>> fields> >>> >>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values >>> in memory (estimated size 540.6 KiB, free 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as >>> bytes in memory (estimated size 46.0 KiB, free 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in >>> memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from >>> showString at NativeMethodAccessorImpl.java:0 >>> >>> 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin >>> packing, max size: 134217728 bytes, open cost is considered as scanning >>> 4194304 bytes. >>> >>> 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at >>> NativeMethodAccessorImpl.java:0 >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at >>> NativeMethodAccessorImpl.java:0) with 1 output partitions >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14 >>> (showString at NativeMethodAccessorImpl.java:0) >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List() >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List() >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14 >>> (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0), >>> which has no missing parents >>> >>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values >>> in memory (estimated size 38.1 KiB, free 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as >>> bytes in memory (estimated size 10.5 KiB, free 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in >>> memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB) >>> >>> 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast >>> at DAGScheduler.scala:1478 >>> >>> 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from >>> ResultStage 14 (MapPartitionsRDD[42] at showString at >>> NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions >>> Vecto
Re: PySpark error java.lang.IllegalArgumentException
Hi team, Any updates on this below issue On Mon, 3 Jul 2023 at 6:18 PM, elango vaidyanathan wrote: > > > Hi all, > > I am reading a parquet file like this and it gives > java.lang.IllegalArgumentException. > However i can work with other parquet files (such as nyc taxi parquet > files) without any issue. I have copied the full error log as well. Can you > please check once and let me know how to fix this? > > import pyspark > > from pyspark.sql import SparkSession > > spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory", > "20g").config("spark.driver.memory", "50g").getOrCreate() > > df=spark.read.parquet("/data/202301/account_cycle") > > df.printSchema() # worksfine > > df.count() #worksfine > > df.show()# getting below error > > >>> df.show() > > 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters: > > 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters: > > 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema: > struct account_status: string, currency_code: string, opened_dt: date ... 30 more > fields> > > 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in > memory (estimated size 540.6 KiB, free 26.5 GiB) > > 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as > bytes in memory (estimated size 46.0 KiB, free 26.5 GiB) > > 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in > memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB) > > 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString > at NativeMethodAccessorImpl.java:0 > > 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing, > max size: 134217728 bytes, open cost is considered as scanning 4194304 > bytes. > > 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at > NativeMethodAccessorImpl.java:0 > > 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at > NativeMethodAccessorImpl.java:0) with 1 output partitions > > 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14 > (showString at NativeMethodAccessorImpl.java:0) > > 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List() > > 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List() > > 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14 > (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0), > which has no missing parents > > 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in > memory (estimated size 38.1 KiB, free 26.5 GiB) > > 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as > bytes in memory (estimated size 10.5 KiB, free 26.5 GiB) > > 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in > memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB) > > 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast > at DAGScheduler.scala:1478 > > 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 14 (MapPartitionsRDD[42] at showString at > NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions > Vector(0)) > > 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 > tasks resource profile 0 > > 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 > (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes) > taskResourceAssignments Map() > > 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48) > > 23/07/03 18:07:20 INFO FileScanRDD: Reading File path: > file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range: > 0-134217728, partition values: [empty row] > > 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID > 48) > > java.lang.IllegalArgumentException > > at java.nio.Buffer.limit(Buffer.java:275) > > at org.xerial.snappy.Snappy.uncompress(Snappy.java:553) > > at > org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71) > > at > org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51) > > at java.io.DataInputStream.readFully(DataInputStream.java:195) > > at java.io.DataInputStream.readFully(DataInputStream.java:169) > > at > org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) > > at > org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) > > at > org.apache.parquet.bytes.BytesInput.toInputStream(Byte
PySpark error java.lang.IllegalArgumentException
Hi all, I am reading a parquet file like this and it gives java.lang.IllegalArgumentException. However i can work with other parquet files (such as nyc taxi parquet files) without any issue. I have copied the full error log as well. Can you please check once and let me know how to fix this? import pyspark from pyspark.sql import SparkSession spark=SparkSession.builder.appName("testPyspark").config("spark.executor.memory", "20g").config("spark.driver.memory", "50g").getOrCreate() df=spark.read.parquet("/data/202301/account_cycle") df.printSchema() # worksfine df.count() #worksfine df.show()# getting below error >>> df.show() 23/07/03 18:07:20 INFO FileSourceStrategy: Pushed Filters: 23/07/03 18:07:20 INFO FileSourceStrategy: Post-Scan Filters: 23/07/03 18:07:20 INFO FileSourceStrategy: Output Data Schema: struct 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 540.6 KiB, free 26.5 GiB) 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 46.0 KiB, free 26.5 GiB) 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on mynode:41055 (size: 46.0 KiB, free: 26.5 GiB) 23/07/03 18:07:20 INFO SparkContext: Created broadcast 19 from showString at NativeMethodAccessorImpl.java:0 23/07/03 18:07:20 INFO FileSourceScanExec: Planning scan with bin packing, max size: 134217728 bytes, open cost is considered as scanning 4194304 bytes. 23/07/03 18:07:20 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0 23/07/03 18:07:20 INFO DAGScheduler: Got job 13 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions 23/07/03 18:07:20 INFO DAGScheduler: Final stage: ResultStage 14 (showString at NativeMethodAccessorImpl.java:0) 23/07/03 18:07:20 INFO DAGScheduler: Parents of final stage: List() 23/07/03 18:07:20 INFO DAGScheduler: Missing parents: List() 23/07/03 18:07:20 INFO DAGScheduler: Submitting ResultStage 14 (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 38.1 KiB, free 26.5 GiB) 23/07/03 18:07:20 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 10.5 KiB, free 26.5 GiB) 23/07/03 18:07:20 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on mynode:41055 (size: 10.5 KiB, free: 26.5 GiB) 23/07/03 18:07:20 INFO SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:1478 23/07/03 18:07:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 14 (MapPartitionsRDD[42] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 23/07/03 18:07:20 INFO TaskSchedulerImpl: Adding task set 14.0 with 1 tasks resource profile 0 23/07/03 18:07:20 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 48) (mynode, executor driver, partition 0, PROCESS_LOCAL, 4890 bytes) taskResourceAssignments Map() 23/07/03 18:07:20 INFO Executor: Running task 0.0 in stage 14.0 (TID 48) 23/07/03 18:07:20 INFO FileScanRDD: Reading File path: file:///data/202301/account_cycle/account_cycle-202301-53.parquet, range: 0-134217728, partition values: [empty row] 23/07/03 18:07:20 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID 48) java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at org.xerial.snappy.Snappy.uncompress(Snappy.java:553) at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:71) at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:286) at org.apache.parquet.bytes.BytesInput.toByteBuffer(BytesInput.java:237) at org.apache.parquet.bytes.BytesInput.toInputStream(BytesInput.java:246) at org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary.(PlainValuesDictionary.java:154) at org.apache.parquet.column.Encoding$1.initDictionary(Encoding.java:96) at org.apache.parquet.column.Encoding$5.initDictionary(Encoding.java:163) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.(VectorizedColumnReader.java:114) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:352) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293) at
Single node spark issue in Sparkly/RStudio
Hi team, In a single Linux node, I would like to set up Rstudio with Sparkly. Three to four people make up the dev team. I am aware of the single-node spark cluster's constraints. When there is a resource problem with Spark, I want to know when more users join in to use Sparkly in Rstudio. It should simply retain the new jobs in the queue rather than crashing. I think this is not only specific to Rstudio/SparklyR. Even applicable for Spark/Pyspark with a single node cluster. Please share the optimal method for allocating Spark's resources in this scenario. Thanks, Elango
Re: CSV parsing issue
Thanks Sean, got it. Thanks, Elango On Thu, May 28, 2020, 9:04 PM Sean Owen wrote: > I don't think so, that data is inherently ambiguous and incorrectly > formatted. If you know something about the structure, maybe you can rewrite > the middle column manually to escape the inner quotes and then reparse. > > On Thu, May 28, 2020 at 10:25 AM elango vaidyanathan > wrote: > >> Is there any way I can handle it in code? >> >> Thanks, >> Elango >> >> On Thu, May 28, 2020, 8:52 PM Sean Owen wrote: >> >>> Your data doesn't escape double-quotes. >>> >>> On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan < >>> elango...@gmail.com> wrote: >>> >>>> >>>> Hi team, >>>> >>>> I am loading an CSV. One column contains a json value. I am unable to >>>> parse that column properly. Below is the details. Can you please check >>>> once? >>>> >>>> >>>> >>>> val df1=spark.read.option("inferSchema","true"). >>>> option("header","true").option("quote", "\"") >>>> >>>> .option("escape", >>>> "\"").csv("/FileStore/tables/sample_file_structure.csv") >>>> >>>> >>>> >>>> sample data: >>>> >>>> >>>> >>>> column1,column2,column3 >>>> >>>> 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : >>>> "abcdef", "language" : "en" }",11 >>>> >>>> 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : >>>> "ghi, jkl", "language" : "en" }",12 123456789,"{ "moveId" : >>>> "123456789", "dob" : null, "username" : "mno, pqr", "language" : "en" >>>> }",13 >>>> >>>> >>>> >>>> output: >>>> >>>> --- >>>> >>>> +-++---+ >>>> >>>> | column1| column2| column3 | >>>> >>>> +-++---+ >>>> >>>> |123456789|"{ "moveId" : "...| "dob" : null| >>>> >>>> |123456789|"{ "moveId" : "...| "dob" : null| >>>> >>>> +-++---+ >>>> >>>> >>>> >>>> Thanks, >>>> Elango >>>> >>>
Re: CSV parsing issue
Is there any way I can handle it in code? Thanks, Elango On Thu, May 28, 2020, 8:52 PM Sean Owen wrote: > Your data doesn't escape double-quotes. > > On Thu, May 28, 2020 at 10:21 AM elango vaidyanathan > wrote: > >> >> Hi team, >> >> I am loading an CSV. One column contains a json value. I am unable to >> parse that column properly. Below is the details. Can you please check once? >> >> >> >> val df1=spark.read.option("inferSchema","true"). >> option("header","true").option("quote", "\"") >> >> .option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv") >> >> >> >> sample data: >> >> >> >> column1,column2,column3 >> >> 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : >> "abcdef", "language" : "en" }",11 >> >> 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : >> "ghi, jkl", "language" : "en" }",12 123456789,"{ "moveId" : >> "123456789", "dob" : null, "username" : "mno, pqr", "language" : "en" >> }",13 >> >> >> >> output: >> >> --- >> >> +-++---+ >> >> | column1| column2| column3 | >> >> +-++---+ >> >> |123456789|"{ "moveId" : "...| "dob" : null| >> >> |123456789|"{ "moveId" : "...| "dob" : null| >> >> +-++---+ >> >> >> >> Thanks, >> Elango >> >
CSV parsing issue
Hi team, I am loading an CSV. One column contains a json value. I am unable to parse that column properly. Below is the details. Can you please check once? val df1=spark.read.option("inferSchema","true"). option("header","true").option("quote", "\"") .option("escape", "\"").csv("/FileStore/tables/sample_file_structure.csv") sample data: column1,column2,column3 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : "abcdef", "language" : "en" }",11 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : "ghi, jkl", "language" : "en" }",12 123456789,"{ "moveId" : "123456789", "dob" : null, "username" : "mno, pqr", "language" : "en" }",13 output: --- +-++---+ | column1| column2| column3 | +-++---+ |123456789|"{ "moveId" : "...| "dob" : null| |123456789|"{ "moveId" : "...| "dob" : null| +-++---+ Thanks, Elango