Re: Job is not able to perform Broadcast Join

2020-10-06 Thread David Edwards
After adding the sequential ids you might need a repartition? I've found
using monotically increasing id before that the df goes to a single
partition. Usually becomes clear in the spark ui though

On Tue, 6 Oct 2020, 20:38 Sachit Murarka,  wrote:

> Yes, Even I tried the same first. Then I moved to join method because
> shuffle spill was happening because row num without partition happens on
> single task. Instead of processinf entire dataframe on single task. I have
> broken down that into df1 and df2 and joining.
> Because df2 is having very less data set since it has 2 cols only.
>
> Thanks
> Sachit
>
> On Wed, 7 Oct 2020, 01:04 Eve Liao,  wrote:
>
>> Try to avoid broadcast. Thought this:
>> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
>> could be helpful.
>>
>> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
>> wrote:
>>
>>> Thanks Eve for response.
>>>
>>> Yes I know we can use broadcast for smaller datasets,I increased the
>>> threshold (4Gb) for the same then also it did not work. and the df3 is
>>> somewhat greater than 2gb.
>>>
>>> Trying by removing broadcast as well.. Job is running since 1 hour. Will
>>> let you know.
>>>
>>>
>>> Thanks
>>> Sachit
>>>
>>> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>>>
 How many rows does df3 have? Broadcast joins are a great way to append
 data stored in relatively *small* single source of truth data files to
 large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
 with tens or even hundreds of thousands of rows is a broadcast candidate.
 Your broadcast variable is probably too large.

 On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
 wrote:

> Hello Users,
>
> I am facing an issue in spark job where I am doing row number()
> without partition by clause because I need to add sequential increasing 
> IDs.
> But to avoid the large spill I am not doing row number() over the
> complete data frame.
>
> Instead I am applying monotically_increasing id on actual data set ,
> then create a new data frame from original data frame which will have
> just monotically_increasing id.
>
> So DF1 = All columns + monotically_increasing_id
> DF2 = Monotically_increasingID
>
> Now I am applying row number() on DF2 since this is a smaller
> dataframe.
>
> DF3 = Monotically_increasingID + Row_Number_ID
>
> Df.join(broadcast(DF3))
>
> This will give me sequential increment id in the original Dataframe.
>
> But below is the stack trace.
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o180.parquet.
> : org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> 

Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Sachit Murarka
Yes, Even I tried the same first. Then I moved to join method because
shuffle spill was happening because row num without partition happens on
single task. Instead of processinf entire dataframe on single task. I have
broken down that into df1 and df2 and joining.
Because df2 is having very less data set since it has 2 cols only.

Thanks
Sachit

On Wed, 7 Oct 2020, 01:04 Eve Liao,  wrote:

> Try to avoid broadcast. Thought this:
> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
> could be helpful.
>
> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
> wrote:
>
>> Thanks Eve for response.
>>
>> Yes I know we can use broadcast for smaller datasets,I increased the
>> threshold (4Gb) for the same then also it did not work. and the df3 is
>> somewhat greater than 2gb.
>>
>> Trying by removing broadcast as well.. Job is running since 1 hour. Will
>> let you know.
>>
>>
>> Thanks
>> Sachit
>>
>> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>>
>>> How many rows does df3 have? Broadcast joins are a great way to append
>>> data stored in relatively *small* single source of truth data files to
>>> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
>>> with tens or even hundreds of thousands of rows is a broadcast candidate.
>>> Your broadcast variable is probably too large.
>>>
>>> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
>>> wrote:
>>>
 Hello Users,

 I am facing an issue in spark job where I am doing row number() without
 partition by clause because I need to add sequential increasing IDs.
 But to avoid the large spill I am not doing row number() over the
 complete data frame.

 Instead I am applying monotically_increasing id on actual data set ,
 then create a new data frame from original data frame which will have
 just monotically_increasing id.

 So DF1 = All columns + monotically_increasing_id
 DF2 = Monotically_increasingID

 Now I am applying row number() on DF2 since this is a smaller
 dataframe.

 DF3 = Monotically_increasingID + Row_Number_ID

 Df.join(broadcast(DF3))

 This will give me sequential increment id in the original Dataframe.

 But below is the stack trace.

 py4j.protocol.Py4JJavaError: An error occurred while calling
 o180.parquet.
 : org.apache.spark.SparkException: Job aborted.
 at
 org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
 at
 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
 at
 org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
 at
 org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
 at
 org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at
 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
 at
 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
 at
 org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
 at
 org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
 at
 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
 at
 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
 at
 org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
 at
 org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
 at
 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
 at
 org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
 at
 org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
 at
 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
 at
 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
 at
 org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 

Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Eve Liao
Try to avoid broadcast. Thought this:
https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
could be helpful.

On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
wrote:

> Thanks Eve for response.
>
> Yes I know we can use broadcast for smaller datasets,I increased the
> threshold (4Gb) for the same then also it did not work. and the df3 is
> somewhat greater than 2gb.
>
> Trying by removing broadcast as well.. Job is running since 1 hour. Will
> let you know.
>
>
> Thanks
> Sachit
>
> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>
>> How many rows does df3 have? Broadcast joins are a great way to append
>> data stored in relatively *small* single source of truth data files to
>> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
>> with tens or even hundreds of thousands of rows is a broadcast candidate.
>> Your broadcast variable is probably too large.
>>
>> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
>> wrote:
>>
>>> Hello Users,
>>>
>>> I am facing an issue in spark job where I am doing row number() without
>>> partition by clause because I need to add sequential increasing IDs.
>>> But to avoid the large spill I am not doing row number() over the
>>> complete data frame.
>>>
>>> Instead I am applying monotically_increasing id on actual data set ,
>>> then create a new data frame from original data frame which will have
>>> just monotically_increasing id.
>>>
>>> So DF1 = All columns + monotically_increasing_id
>>> DF2 = Monotically_increasingID
>>>
>>> Now I am applying row number() on DF2 since this is a smaller dataframe.
>>>
>>> DF3 = Monotically_increasingID + Row_Number_ID
>>>
>>> Df.join(broadcast(DF3))
>>>
>>> This will give me sequential increment id in the original Dataframe.
>>>
>>> But below is the stack trace.
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o180.parquet.
>>> : org.apache.spark.SparkException: Job aborted.
>>> at
>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>> at
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>> at
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>> at
>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> at
>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>> at
>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>> at
>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> at
>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at
>>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at py4j.Gateway.invoke(Gateway.java:282)
>>> at
>>> 

Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Sachit Murarka
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the
threshold (4Gb) for the same then also it did not work. and the df3 is
somewhat greater than 2gb.

Trying by removing broadcast as well.. Job is running since 1 hour. Will
let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:

> How many rows does df3 have? Broadcast joins are a great way to append
> data stored in relatively *small* single source of truth data files to
> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
> with tens or even hundreds of thousands of rows is a broadcast candidate.
> Your broadcast variable is probably too large.
>
> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
> wrote:
>
>> Hello Users,
>>
>> I am facing an issue in spark job where I am doing row number() without
>> partition by clause because I need to add sequential increasing IDs.
>> But to avoid the large spill I am not doing row number() over the
>> complete data frame.
>>
>> Instead I am applying monotically_increasing id on actual data set ,
>> then create a new data frame from original data frame which will have
>> just monotically_increasing id.
>>
>> So DF1 = All columns + monotically_increasing_id
>> DF2 = Monotically_increasingID
>>
>> Now I am applying row number() on DF2 since this is a smaller dataframe.
>>
>> DF3 = Monotically_increasingID + Row_Number_ID
>>
>> Df.join(broadcast(DF3))
>>
>> This will give me sequential increment id in the original Dataframe.
>>
>> But below is the stack trace.
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
>> : org.apache.spark.SparkException: Job aborted.
>> at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>> at
>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>> at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>> at
>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>> at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>> at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>> at
>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at py4j.Gateway.invoke(Gateway.java:282)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.spark.SparkException: Could not execute broadcast
>> in 1000 secs. 

Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Eve Liao
How many rows does df3 have? Broadcast joins are a great way to append data
stored in relatively *small* single source of truth data files to large
DataFrames. DataFrames up to 2GB can be broadcasted so a data file with
tens or even hundreds of thousands of rows is a broadcast candidate. Your
broadcast variable is probably too large.

On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
wrote:

> Hello Users,
>
> I am facing an issue in spark job where I am doing row number() without
> partition by clause because I need to add sequential increasing IDs.
> But to avoid the large spill I am not doing row number() over the complete
> data frame.
>
> Instead I am applying monotically_increasing id on actual data set ,
> then create a new data frame from original data frame which will have just
> monotically_increasing id.
>
> So DF1 = All columns + monotically_increasing_id
> DF2 = Monotically_increasingID
>
> Now I am applying row number() on DF2 since this is a smaller dataframe.
>
> DF3 = Monotically_increasingID + Row_Number_ID
>
> Df.join(broadcast(DF3))
>
> This will give me sequential increment id in the original Dataframe.
>
> But below is the stack trace.
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
> : org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.spark.SparkException: Could not execute broadcast in
> 1000 secs. You can increase the timeout for broadcasts via
> spark.sql.broadcastTimeout or disable broadcast join by setting
> spark.sql.autoBroadcastJoinThreshold to -1
>
> Initially this threshold was 300. I already increased it.
>
>
> Kind Regards,
> Sachit Murarka
>