Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-14 Thread Gary Liu
Hi Mich,

The y-axis is the number of executors. The code ran on dataproc serverless
spark on 3.3.2.

I tried closing autoscaling by setting the following:

spark.dynamicAllocation.enabled=false
spark.executor.instances=60

And still got the FetchFailedException error. I Wonder why it can run
without problem in a vertex notebook with local mode, which has less
resources. Of course it ran much longer time (8 hours local mode vs. 30 min
in serverless)

Will try to break the jobs into smaller parts, and see which step exactly
caused the problem.

Thanks!

On Mon, Mar 13, 2023 at 11:26 AM Mich Talebzadeh 
wrote:

> Hi Gary
>
> Thanks for the update. So  this serverless dataproc. on 3.3.1. Maybe an
> autoscaling policy could be an option. What is y-axis? Is that the capacity?
>
> Can you break down the join into multiple parts and save the intermediate
> result set?
>
>
> HTH
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 13 Mar 2023 at 14:56, Gary Liu  wrote:
>
>> Hi Mich,
>> I used the serverless spark session, not the local mode in the notebook.
>> So machine type does not matter in this case. Below is the chart for
>> serverless spark session execution. I also tried to increase executor
>> memory and core, but the issue did got get resolved. I will try shutting
>> down autoscaling, and see what will happen.
>> [image: Serverless Session Executors-4core.png]
>>
>>
>> On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> for your dataproc what type of machines are you using for example
>>> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
>>> autoscaling turned on.
>>>
>>> most likely executor memory limit?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>>>
>>>> Hi ,
>>>>
>>>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is
>>>> a job involving multiple joinings, as well as a complex UDF. I always got
>>>> the below FetchFailedException, but the job can be done and the results
>>>> look right. Neither of 2 input data is very big (one is 6.5M rows*11
>>>> columns, ~150M in orc format and 17.7M rows*11 columns, ~400M in orc
>>>> format). It ran very smoothly on and on-premise spark environment though.
>>>>
>>>> According to Google's document (
>>>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>>>> it has 3 solutions:
>>>> 1. Using EFM mode
>>>> 2. Increase executor memory
>>>> 3, decrease the number of job partitions.
>>>>
>>>> 1. I started the session from a vertex notebook, so I don't know how to
>>>> use EFM mode.
>>>> 2. I increased executor memory from the default 12GB to 25GB, and the
>>>> number of cores from 4 to 8, but it did not solve the problem.
>>>> 3. Wonder how to do this? repartition the input dataset to have less
>>>> partitions? I used df.rdd.getNumPartitions() to check the input data
>>>> partitions, they have 9 and 17 partitions respectively, should I decrease
>>>> them further? I also read a post on StackOverflow (
>>>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>>>> saying in

Re: org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-13 Thread Gary Liu
Hi Mich,
I used the serverless spark session, not the local mode in the notebook. So
machine type does not matter in this case. Below is the chart for
serverless spark session execution. I also tried to increase executor
memory and core, but the issue did got get resolved. I will try shutting
down autoscaling, and see what will happen.
[image: Serverless Session Executors-4core.png]


On Fri, Mar 10, 2023 at 11:55 AM Mich Talebzadeh 
wrote:

> for your dataproc what type of machines are you using for example
> n2-standard-4 with 4vCPU and 16GB or something else? how many nodes and if
> autoscaling turned on.
>
> most likely executor memory limit?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 15:35, Gary Liu  wrote:
>
>> Hi ,
>>
>> I have a job in GCP dataproc server spark session (spark 3.3.2), it is a
>> job involving multiple joinings, as well as a complex UDF. I always got the
>> below FetchFailedException, but the job can be done and the results look
>> right. Neither of 2 input data is very big (one is 6.5M rows*11 columns,
>> ~150M in orc format and 17.7M rows*11 columns, ~400M in orc format). It ran
>> very smoothly on and on-premise spark environment though.
>>
>> According to Google's document (
>> https://cloud.google.com/dataproc/docs/support/spark-job-tuning#shuffle_fetch_failures),
>> it has 3 solutions:
>> 1. Using EFM mode
>> 2. Increase executor memory
>> 3, decrease the number of job partitions.
>>
>> 1. I started the session from a vertex notebook, so I don't know how to
>> use EFM mode.
>> 2. I increased executor memory from the default 12GB to 25GB, and the
>> number of cores from 4 to 8, but it did not solve the problem.
>> 3. Wonder how to do this? repartition the input dataset to have less
>> partitions? I used df.rdd.getNumPartitions() to check the input data
>> partitions, they have 9 and 17 partitions respectively, should I decrease
>> them further? I also read a post on StackOverflow (
>> https://stackoverflow.com/questions/34941410/fetchfailedexception-or-metadatafetchfailedexception-when-processing-big-data-se),
>> saying increasing partitions may help.Which one makes more sense? I
>> repartitioned the input data to 20 and 30 partitions, but still no luck.
>>
>> Any suggestions?
>>
>> 23/03/10 14:32:19 WARN TaskSetManager: Lost task 58.1 in stage 27.0 (TID 
>> 3783) (10.1.0.116 executor 33): FetchFailed(BlockManagerId(72, 10.1.15.199, 
>> 36791, None), shuffleId=24, mapIndex=77, mapId=3457, reduceId=58, message=
>> org.apache.spark.shuffle.FetchFailedException
>>  at 
>> org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:312)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1180)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:918)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
>>  at 
>> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>>  at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
>>  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>>  at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>  at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
>>  at 
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>>  at 
>> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartiti

org.apache.spark.shuffle.FetchFailedException in dataproc

2023-03-10 Thread Gary Liu
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.ExecutorDeadException: The relative remote
executor(Id: 72), which maintains the block data to fetch is dead.
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:136)
at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:146)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:363)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1150)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1142)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:702)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:192)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89)
at 
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:240)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
... 40 more

)


-- 
Gary Liu


Re: may I need a join here?

2022-01-24 Thread Gary Liu
You can use left anti join instead. isin accept a list type, not a column
type.

On Mon, Jan 24, 2022 at 01:38 Bitfox  wrote:

> >>> df.show(3)
>
> ++-+
>
> |word|count|
>
> ++-+
>
> |  on|1|
>
> | dec|1|
>
> |2020|1|
>
> ++-+
>
> only showing top 3 rows
>
>
> >>> df2.show(3)
>
> ++-+
>
> |stopword|count|
>
> ++-+
>
> |able|1|
>
> |   about|1|
>
> |   above|1|
>
> ++-+
>
> only showing top 3 rows
>
>
> >>> df3=df.filter(~col("word").isin(df2.stopword ))
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File "/opt/spark/python/pyspark/sql/dataframe.py", line 1733, in filter
>
> jdf = self._jdf.filter(condition._jc)
>
>   File
> "/opt/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line
> 1310, in __call__
>
>   File "/opt/spark/python/pyspark/sql/utils.py", line 117, in deco
>
> raise converted from None
>
> pyspark.sql.utils.AnalysisException: Resolved attribute(s) stopword#4
> missing from word#0,count#1L in operator !Filter NOT word#0 IN
> (stopword#4).;
>
> !Filter NOT word#0 IN (stopword#4)
>
> +- LogicalRDD [word#0, count#1L], false
>
>
>
>
>
> The filter method doesn't work here.
>
> Maybe I need a join for two DF?
>
> What's the syntax for this?
>
>
>
> Thank you and regards,
>
> Bitfox
>
-- 
Gary Liu