Creating Custom Broadcast Join

2022-09-01 Thread Murali S
Hi,

I wanted to broadcast a Dataframe to all executors and do an operation
similar to join, but might return a variable number of rows than the rows
in each partition and could use multiple rows to produce one row.
I am trying to create a custom join operator for this use case. It would be
great if you could point me to a similar code.
My thought process to do this was to create a HashedRelation from the
Dataframe and broadcast that HashedRelation and then extract internal rows
on each partition at the executor level.

Thanks
Mura


Re: Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread FengYu Cao
I will open a JIRA, but since it's our production event log, can't attach
to it.

try to setup a debugger  to provider more information.

Chao Sun  于2022年9月1日周四 23:06写道:

> Hi Fengyu,
>
> Do you still have the Parquet file that caused the error? could you
> open a JIRA and attach the file to it? I can take a look.
>
> Chao
>
> On Thu, Sep 1, 2022 at 4:03 AM FengYu Cao  wrote:
> >
> > I'm trying to upgrade our spark (3.2.1 now)
> >
> > but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet
> file
> >
> > Is anyone else having the same problem as me? Or do I need to provide
> any information to the devs ?
> >
> > ```
> >
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage
> 1.0 (TID 7) (10.113.39.118 executor 1): java.io.IOException: can not read
> class org.apache.parquet.format.PageHeader: don't know what type: 15
> > at org.apache.parquet.format.Util.read(Util.java:365)
> > at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
> > at
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
> > at
> org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
> > at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
> > at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> > at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
> Source)
> > at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> > at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> > at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> > at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> > at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> > at org.apache.spark.scheduler.Task.run(Task.scala:131)
> > at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> > at java.base/java.lang.Thread.run(Unknown Source)
> > Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException:
> don't know what type: 15
> > at
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
> > at
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
> > at
> org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
> > at
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
> > at
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
> > at
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
> > at
> 

Re: Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread Chao Sun
Hi Fengyu,

Do you still have the Parquet file that caused the error? could you
open a JIRA and attach the file to it? I can take a look.

Chao

On Thu, Sep 1, 2022 at 4:03 AM FengYu Cao  wrote:
>
> I'm trying to upgrade our spark (3.2.1 now)
>
> but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet file
>
> Is anyone else having the same problem as me? Or do I need to provide any 
> information to the devs ?
>
> ```
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 
> (TID 7) (10.113.39.118 executor 1): java.io.IOException: can not read class 
> org.apache.parquet.format.PageHeader: don't know what type: 15
> at org.apache.parquet.format.Util.read(Util.java:365)
> at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
> at 
> org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
> at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
> at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at 
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException: 
> don't know what type: 15
> at 
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
> at 
> shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
> at 
> org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
> at 
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
> at 
> shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
> at 
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
> at 
> org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019)
> at org.apache.parquet.format.PageHeader.read(PageHeader.java:896)
> at org.apache.parquet.format.Util.read(Util.java:362)
> ... 32 more
>
>
> ```
>
> similar to https://issues.apache.org/jira/browse/SPARK-11844, but we 

Re: [Structured Streaming + Kafka] Reduced support for alternative offset management

2022-09-01 Thread Jungtaek Lim
Please consider DStream as old school technology and migrate to Structured
Streaming. There is little effort on DStream, and the most focused one is
Spark SQL, and for streaming workloads, Structured Streaming.
For Kafka integration, the guide doc is here,
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

All questions still apply to Kafka integration on Structured Streaming
though. The main reason we maintain our own checkpoint is to guarantee
fault-tolerance; to provide fault-tolerant semantics, the query should be
able to replay exactly the same data from the latest successful batch. This
is not feasible and unreliable if we rely on the Kafka commit mechanism.

You can still easily construct the custom streaming query listener to
commit the progress to Kafka separately, so that you can also leverage the
ecosystem of Kafka. This project is an example:
https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Tue, Aug 30, 2022 at 5:05 PM Martin Andersson 
wrote:

> I was looking around for some documentation regarding how checkpointing
> (or rather, delivery semantics) is done when consuming from kafka with
> structured streaming and I stumbled across this old documentation (that
> still somehow exists in latest versions) at
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#checkpoints.
>
>
> This page (which I assume is from around the time of Spark 2.4?) describes
> that storing offsets using checkpoiting is the *least* reliable method
> and goes further into how to use kafka or an external storage to commit
> offsets.
>
> It also says
>
> If you enable Spark checkpointing, offsets will be stored in the
> checkpoint. (...) Furthermore, you cannot recover from a checkpoint if your
> application code has changed.
>
>
> This all leaves me with several questions:
>
>1. Is the above quote still true for Spark 3, that the checkpoint will
>break if you change the code? How about changing the subscribe pattern?
>
>2. Why was the option to manually commit offsets asynchronously to
>kafka removed when it was deemed more reliable than checkpointing? Not to
>mention that storing offsets in kafka allows you to use all the tools
>offered in the kafka distribution to easily reset/rewind offsets on
>specific topics, which doesn't seem to be possible when using checkpoints.
>
>3. From a user perspective, storing offsets in kafka offers more
>features. From a developer perspective, having to re-implement offset
>storage with checkpointing across several output systems (such as HDFS, AWS
>S3 and other object storages) seems like a lot of unnecessary work and
>re-inventing the wheel.
>Is the discussion leading up to the decision to only support storing
>offsets with checkpointing documented anywhere, perhaps in a jira?
>
> Thanks for your time
>


Spark 3.3.0/3.2.2: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 15

2022-09-01 Thread FengYu Cao
I'm trying to upgrade our spark (3.2.1 now)

but with spark 3.3.0 and spark 3.2.2, we had error with specific parquet
file

Is anyone else having the same problem as me? Or do I need to provide any
information to the devs ?

```

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3
in stage 1.0 (TID 7) (10.113.39.118 executor 1): java.io.IOException:
can not read class org.apache.parquet.format.PageHeader: don't know
what type: 15
at org.apache.parquet.format.Util.read(Util.java:365)
at org.apache.parquet.format.Util.readPageHeader(Util.java:132)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429)
at 
org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402)
at 
org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928)
at 
org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:972)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:338)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:293)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException:
don't know what type: 15
at 
shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.getTType(TCompactProtocol.java:894)
at 
shaded.parquet.org.apache.thrift.protocol.TCompactProtocol.readFieldBegin(TCompactProtocol.java:560)
at 
org.apache.parquet.format.InterningProtocol.readFieldBegin(InterningProtocol.java:155)
at 
shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:108)
at 
shaded.parquet.org.apache.thrift.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
at 
org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1100)
at 
org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019)
at org.apache.parquet.format.PageHeader.read(PageHeader.java:896)
at org.apache.parquet.format.Util.read(Util.java:362)
... 32 more


```

similar to https://issues.apache.org/jira/browse/SPARK-11844, but we can
reproduce error above


*df =