Creating Custom Broadcast Join

2022-09-01 Thread Murali S
Hi,

I wanted to broadcast a Dataframe to all executors and do an operation
similar to join, but might return a variable number of rows than the rows
in each partition and could use multiple rows to produce one row.
I am trying to create a custom join operator for this use case. It would be
great if you could point me to a similar code.
My thought process to do this was to create a HashedRelation from the
Dataframe and broadcast that HashedRelation and then extract internal rows
on each partition at the executor level.

Thanks
Mura


Re: Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread FengYu Cao
I will open a JIRA, but since it's our production event log, can't attach
to it.

try to setup a debugger  to provider more information.

Chao Sun  于2022年9月1日周四 23:06写道:

> Hi Fengyu,
>
> Do you still have the Parquet file that caused the error? could you
> open a JIRA and attach the file to it? I can take a look.
>
> Chao
>
> On Thu, Sep 1, 2022 at 4:03 AM FengYu Cao  wrote:
> >
> > I'm trying to upgrade our spark (3.2.1 now)
> >
> > but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet
> file
> >
> > Is anyone else having the same problem as me? Or do I need to provide
> any information to the devs ?
> >
> > ```
> >
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
> 1.0 (TID 7) (10.113.39.118 executor 1): java.io.IOException: can not read
> class org.apache.parquet.format.PageHeader: don't know what type: 15
> > at org.apache.parquet.format.Util.read(Util.java:365)
> > at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
> > at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> > at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
> Source)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> > at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> > at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> > at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> > at org.apache.spark.scheduler.Task.run(Task.scala:131)
> > at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> > at java.base/java.lang.Thread.run(Unknown Source)
> > Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException:
> don't know what type: 15
> > at
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
> > at
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
> > at
> org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
> > at
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
> > at
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
> > at
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
> > at
> 

Re: running pyspark on kubernetes - no space left on device

2022-09-01 Thread Qian SUN
Hi
Spark provides spark.local.dir configuration to specify work folder on the
pod. You can specify spark.local.dir as your mount path.

Best regards

Manoj GEORGE  于2022年9月1日周四 21:16写道:

> CONFIDENTIAL & RESTRICTED
>
> Hi Team,
>
>
>
> I am new to spark, so please excuse my ignorance.
>
>
>
> Currently we are trying to run PySpark on Kubernetes cluster. The setup is
> working fine for some jobs, but when we are processing a large file ( 36
> gb),  we run into one of space issues.
>
>
>
> Based on what was found on internet, we have mapped the local dir to a
> persistent volume. This still doesn’t solve the issue.
>
>
>
> I am not sure if it is still writing to /tmp folder on the pod. Is there
> some other setting which need to be changed for this to work.
>
>
>
> Thanks in advance.
>
>
>
>
>
>
>
> Thanks,
>
> Manoj George
>
> *Manager Database Architecture*​
> M: +1 3522786801
>
> manoj.geo...@amadeus.com
>
> www.amadeus.com
> 
> ​
>
>
> 
>
>
> Disclaimer: This email message and information contained in or attached to
> this message may be privileged, confidential, and protected from disclosure
> and is intended only for the person or entity to which it is addressed. Any
> review, retransmission, dissemination, printing or other use of, or taking
> of any action in reliance upon, this information by persons or entities
> other than the intended recipient is prohibited. If you receive this
> message in error, please immediately inform the sender by reply email and
> delete the message and any attachments. Thank you.
>


-- 
Best!
Qian SUN


Re: Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread Chao Sun
Hi Fengyu,

Do you still have the Parquet file that caused the error? could you
open a JIRA and attach the file to it? I can take a look.

Chao

On Thu, Sep 1, 2022 at 4:03 AM FengYu Cao  wrote:
>
> I'm trying to upgrade our spark (3.2.1 now)
>
> but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet file
>
> Is anyone else having the same problem as me? Or do I need to provide any 
> information to the devs ?
>
> ```
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 
> (TID 7) (10.113.39.118 executor 1): java.io.IOException: can not read class 
> org.apache.parquet.format.PageHeader: don't know what type: 15
> at org.apache.parquet.format.Util.read(Util.java:365)
> at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
> at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at 
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException: 
> don't know what type: 15
> at 
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
> at 
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
> at 
> org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
> at 
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
> at 
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
> at 
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
> at 
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019)
> at org.apache.parquet.format.PageHeader.read(PageHeader.java:896)
> at org.apache.parquet.format.Util.read(Util.java:362)
> ... 32 more
>
>
> ```
>
> similar to https://issues.apache.org/jira/browse/SPARK-11844, but we 

Re: running pyspark on kubernetes - no space left on device

2022-09-01 Thread Matt Proetsch
Hi George,

You can try mounting a larger PersistentVolume to the work directory as 
described here instead of using localdir which might have site-specific size 
constraints:

https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-kubernetes-volumes

-Matt

> On Sep 1, 2022, at 09:16, Manoj GEORGE  
> wrote:
> 
> 
> CONFIDENTIAL & RESTRICTED
> 
> Hi Team,
>  
> I am new to spark, so please excuse my ignorance.
>  
> Currently we are trying to run PySpark on Kubernetes cluster. The setup is 
> working fine for some jobs, but when we are processing a large file ( 36 gb), 
>  we run into one of space issues.
>  
> Based on what was found on internet, we have mapped the local dir to a 
> persistent volume. This still doesn’t solve the issue.
>  
> I am not sure if it is still writing to /tmp folder on the pod. Is there some 
> other setting which need to be changed for this to work.
>  
> Thanks in advance.
>  
>  
>  
> Thanks,
> Manoj George
> Manager Database Architecture​
> M: +1 3522786801
> manoj.geo...@amadeus.com
> www.amadeus.com​
> 
>  
> Disclaimer: This email message and information contained in or attached to 
> this message may be privileged, confidential, and protected from disclosure 
> and is intended only for the person or entity to which it is addressed. Any 
> review, retransmission, dissemination, printing or other use of, or taking of 
> any action in reliance upon, this information by persons or entities other 
> than the intended recipient is prohibited. If you receive this message in 
> error, please immediately inform the sender by reply email and delete the 
> message and any attachments. Thank you.


running pyspark on kubernetes - no space left on device

2022-09-01 Thread Manoj GEORGE
CONFIDENTIAL & RESTRICTED

Hi Team,

I am new to spark, so please excuse my ignorance.

Currently we are trying to run PySpark on Kubernetes cluster. The setup is 
working fine for some jobs, but when we are processing a large file ( 36 gb),  
we run into one of space issues.

Based on what was found on internet, we have mapped the local dir to a 
persistent volume. This still doesn’t solve the issue.

I am not sure if it is still writing to /tmp folder on the pod. Is there some 
other setting which need to be changed for this to work.

Thanks in advance.



Thanks,
Manoj George
Manager Database Architecture​
M: +1 3522786801
manoj.geo...@amadeus.com
www.amadeus.com​
[cid:image001.png@01D8BDDF.E19AB9C0]

Disclaimer: This email message and information contained in or attached to this 
message may be privileged, confidential, and protected from disclosure and is 
intended only for the person or entity to which it is addressed. Any review, 
retransmission, dissemination, printing or other use of, or taking of any 
action in reliance upon, this information by persons or entities other than the 
intended recipient is prohibited. If you receive this message in error, please 
immediately inform the sender by reply email and delete the message and any 
attachments. Thank you.


Re: Moving to Spark 3x from Spark2

2022-09-01 Thread Martin Andersson
You should check the release notes and upgrade instructions.

From: rajat kumar 
Sent: Thursday, September 1, 2022 12:44
To: user @spark 
Subject: Moving to Spark 3x from Spark2


EXTERNAL SENDER. Do not click links or open attachments unless you recognize 
the sender and know the content is safe. DO NOT provide your username or 
password.


Hello Members,

We want to move to Spark 3 from Spark2.4 .

Are there any changes we need to do at code level which can break the existing 
code?

Will it work by simply changing the version of spark & scala ?

Regards
Rajat


Re: Moving to Spark 3x from Spark2

2022-09-01 Thread Khalid Mammadov
Hi Rajat

There were a lot of changes between those versions and the only possible
option to assess impact to do your testings unfortunately.

Most probably you will have to do some changes to your codebase.

Regards
Khalid


On Thu, 1 Sept 2022, 11:45 rajat kumar,  wrote:

> Hello Members,
>
> We want to move to Spark 3 from Spark2.4 .
>
> Are there any changes we need to do at code level which can break the
> existing code?
>
> Will it work by simply changing the version of spark & scala ?
>
> Regards
> Rajat
>


Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread FengYu Cao
I'm trying to upgrade our spark (3.2.1 now)

but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet
file

Is anyone else having the same problem as me? Or do I need to provide any
information to the devs ?

```

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3
in stage 1.0 (TID 7) (10.113.39.118 executor 1): java.io.IOException:
can not read class org.apache.parquet.format.PageHeader: don't know
what type: 15
at org.apache.parquet.format.Util.read(Util.java:365)
at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
at 
org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException:
don't know what type: 15
at 
shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
at 
shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
at 
org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
at 
shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
at 
shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
at 
org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
at 
org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019)
at org.apache.parquet.format.PageHeader.read(PageHeader.java:896)
at org.apache.parquet.format.Util.read(Util.java:362)
... 32 more


```

similar to https://issues.apache.org/jira/browse/SPARK-11844, but we can
reproduce error above


*df = 

Moving to Spark 3x from Spark2

2022-09-01 Thread rajat kumar
Hello Members,

We want to move to Spark 3 from Spark2.4 .

Are there any changes we need to do at code level which can break the
existing code?

Will it work by simply changing the version of spark & scala ?

Regards
Rajat