Dear all,

I'm on a case that when certain table being exposed to broadcast join, the
query will eventually failed with remote block error.

Firstly. We set the spark.sql.autoBroadcastJoinThreshold to 10MB, namely
10485760
[image: image.png]

Then we proceed to perform query. In the SQL plan, we found that one table
that is 25MB in size is broadcast as well.

[image: image.png]

Also in desc extended the table is 24452111 bytes. It is a Hive table. We
always ran into error when this table being broadcast. Below is the sample
error

Caused by: java.io.IOException: org.apache.spark.SparkException:
corrupt remote block broadcast_477_piece0 of broadcast_477: 298778625
!= -992055931
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
        at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
        at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)


Also attached the physical plan if you're interested. One thing to
note that, if I turn down autoBroadcastJoinThreshold

to 5MB, this query will get successfully executed and default.product
NOT broadcasted.


However, when I change to another query that querying even less
columns than pervious one, even in 5MB this table still get
broadcasted and failed with the same error. I even changed to 1MB and
still the same.


Appreciate if you can share any input. Thank you very much.


Best Regards,

MIke
== Physical Plan ==
*(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
fiscal_week_start_date#220 AS period_start_date#472, fiscal_week_end_date#221 
AS period_end_date#473, CASE WHEN isnull(fiscal_week_of_year#222) THEN 0 ELSE 1 
END AS valid_date_flag#474, max_fiscal_date#451, vs_max_sales_year#270, 
vs_max_sales_month#267, vs_max_sales_week#266, bu_id#272 AS bu_code#475, 
bu_name#273, principle_supplier_code#154 AS supplier_code#476, 
mother_company_name#150 AS supplier_name#477, brand_type_name#117, 
brand_name#115, coalesce(h1_l1_hierarchy_code#125, -) AS Category_Code#478, 
coalesce(substring(h1_l1_hierarchy_code#125, 3, 2), -) AS Cate_No#479, 
h1_l1_hierarchy_name#126 AS Category_Name#480, 
coalesce(h1_l2_hierarchy_code#127, -) AS Department_Code#481, 
coalesce(substring(h1_l2_hierarchy_code#127, 7, 2), -) AS Dept_No#482, ... 48 
more fields]
+- *(10) BroadcastHashJoin [bu_key#156], [bu_key#271], LeftOuter, BuildRight
   :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 41 more fields]
   :  +- *(10) BroadcastHashJoin [fiscal_week_of_year#449, fiscal_year#447], 
[fiscal_week_of_year#222, fiscal_year#234], LeftOuter, BuildRight
   :     :- *(10) Project [product_key#445, store_key#446, fiscal_year#447, 
fiscal_month#448, fiscal_week_of_year#449, fiscal_year_week#450, 
max_fiscal_date#451, sales_amt_local#452, cogs_amt_local#453, 
gross_profit_amt_local#454, gross_margin_amt_local#455, adj_amt_local#456, 
compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 35 more fields]
   :     :  +- *(10) BroadcastHashJoin [store_id#157], [loc_idnt#521], 
LeftOuter, BuildRight
   :     :     :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 33 more fields]
   :     :     :  +- *(10) BroadcastHashJoin [cast(store_key#446 as double)], 
[cast(store_key#155 as double)], LeftOuter, BuildRight
   :     :     :     :- *(10) Project [product_key#445, store_key#446, 
fiscal_year#447, fiscal_month#448, fiscal_week_of_year#449, 
fiscal_year_week#450, max_fiscal_date#451, sales_amt_local#452, 
cogs_amt_local#453, gross_profit_amt_local#454, gross_margin_amt_local#455, 
adj_amt_local#456, compensation#457, qty#458, sales_amt_local_shrink#459, 
cogs_amt_local_shrink#460, qty_shrink#461, sales_amt_local_ly#462, 
cogs_amt_local_ly#463, gross_profit_amt_local_ly#464, 
gross_margin_amt_local_ly#465, adj_amt_local_ly#466, compensation_ly#467, 
qty_ly#468, ... 19 more fields]
   :     :     :     :  +- *(10) BroadcastHashJoin [cast(product_key#445 as 
double)], [cast(product_key#100 as double)], LeftOuter, BuildRight
   :     :     :     :     :- *(10) Project [coalesce(product_key#80, 
product_key#524) AS product_key#445, coalesce(store_key#81, store_key#525) AS 
store_key#446, coalesce(fiscal_year#83, cast(cast((cast(fiscal_year#527 as 
double) + 1.0) as int) as string)) AS fiscal_year#447, 
coalesce(fiscal_month#84, fiscal_month#528) AS fiscal_month#448, 
coalesce(fiscal_week_of_year#85, fiscal_week_of_year#529) AS 
fiscal_week_of_year#449, coalesce(fiscal_year_week#86, 
cast(cast((cast(fiscal_year_week#530 as double) + 100.0) as int) as string)) AS 
fiscal_year_week#450, coalesce(max_fiscal_date#87, max_fiscal_date#531) AS 
max_fiscal_date#451, coalesce(sales_amt_local#91, 0.0) AS sales_amt_local#452, 
coalesce(cogs_amt_local#92, 0.0) AS cogs_amt_local#453, 
coalesce(gross_profit_amt_local#93, 0.0) AS gross_profit_amt_local#454, 
(coalesce(gross_profit_amt_local#93, 0.0) + coalesce(compensation#95, 0.0)) AS 
gross_margin_amt_local#455, coalesce(adj_amt_local#94, 0.0) AS 
adj_amt_local#456, coalesce(compensation#95, 0.0) AS compensation#457, 
coalesce(qty#96, 0.0) AS qty#458, coalesce(sales_amt_local_shrink#97, 0.0) AS 
sales_amt_local_shrink#459, coalesce(cogs_amt_local_shrink#98, 0.0) AS 
cogs_amt_local_shrink#460, coalesce(qty_shrink#99, 0.0) AS qty_shrink#461, 
coalesce(sales_amt_local#535, 0.0) AS sales_amt_local_ly#462, 
coalesce(cogs_amt_local#536, 0.0) AS cogs_amt_local_ly#463, 
coalesce(gross_profit_amt_local#537, 0.0) AS gross_profit_amt_local_ly#464, 
(coalesce(gross_profit_amt_local#537, 0.0) + coalesce(compensation#539, 0.0)) 
AS gross_margin_amt_local_ly#465, coalesce(adj_amt_local#538, 0.0) AS 
adj_amt_local_ly#466, coalesce(compensation#539, 0.0) AS compensation_ly#467, 
coalesce(qty#540, 0.0) AS qty_ly#468, ... 3 more fields]
   :     :     :     :     :  +- *(10) Filter ((vs_max_sales_year#88 >= -2) || 
vs_max_sales_year#532 IN (-1,-2,-3))
   :     :     :     :     :     +- SortMergeJoin [product_key#80, 
store_key#81, (cast(fiscal_year#83 as double) - 1.0), fiscal_week_of_year#85], 
[product_key#524, store_key#525, cast(fiscal_year#527 as double), 
fiscal_week_of_year#529], FullOuter
   :     :     :     :     :        :- *(2) Sort [product_key#80 ASC NULLS 
FIRST, store_key#81 ASC NULLS FIRST, (cast(fiscal_year#83 as double) - 1.0) ASC 
NULLS FIRST, fiscal_week_of_year#85 ASC NULLS FIRST], false, 0
   :     :     :     :     :        :  +- Exchange 
hashpartitioning(product_key#80, store_key#81, (cast(fiscal_year#83 as double) 
- 1.0), fiscal_week_of_year#85, 200)
   :     :     :     :     :        :     +- *(1) FileScan parquet 
default.temp_sales_aggregate_wk[product_key#80,store_key#81,fiscal_year#83,fiscal_month#84,fiscal_week_of_year#85,fiscal_year_week#86,max_fiscal_date#87,vs_max_sales_year#88,sales_amt_local#91,cogs_amt_local#92,gross_profit_amt_local#93,adj_amt_local#94,compensation#95,qty#96,sales_amt_local_shrink#97,cogs_amt_local_shrink#98,qty_shrink#99]
 Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<product_key:string,store_key:string,fiscal_year:string,fiscal_month:string,fiscal_week_of_...
   :     :     :     :     :        +- *(4) Sort [product_key#524 ASC NULLS 
FIRST, store_key#525 ASC NULLS FIRST, cast(fiscal_year#527 as double) ASC NULLS 
FIRST, fiscal_week_of_year#529 ASC NULLS FIRST], false, 0
   :     :     :     :     :           +- Exchange 
hashpartitioning(product_key#524, store_key#525, cast(fiscal_year#527 as 
double), fiscal_week_of_year#529, 200)
   :     :     :     :     :              +- *(3) FileScan parquet 
default.temp_sales_aggregate_wk[product_key#524,store_key#525,fiscal_year#527,fiscal_month#528,fiscal_week_of_year#529,fiscal_year_week#530,max_fiscal_date#531,vs_max_sales_year#532,sales_amt_local#535,cogs_amt_local#536,gross_profit_amt_local#537,adj_amt_local#538,compensation#539,qty#540,sales_amt_local_shrink#541,cogs_amt_local_shrink#542,qty_shrink#543]
 Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<product_key:string,store_key:string,fiscal_year:string,fiscal_month:string,fiscal_week_of_...
   :     :     :     :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, decimal(38,0), true] as double)))
   :     :     :     :        +- *(5) FileScan parquet 
default.product[product_key#100,product_id#102,product_other_name#106,product_name#107,product_other_description#108,brand_name#115,brand_type_name#117,h1_l1_hierarchy_code#125,h1_l1_hierarchy_name#126,h1_l2_hierarchy_code#127,h1_l2_hierarchy_name#128,h1_l3_hierarchy_code#129,h1_l3_hierarchy_name#130,h1_l4_hierarchy_code#131,h1_l4_hierarchy_name#132,Mother_Company_Name#150,Principle_Supplier_Code#154]
 Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<product_key:decimal(38,0),product_id:string,product_other_name:string,product_name:string,...
   :     :     :     +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, decimal(38,0), true] as double)))
   :     :     :        +- *(6) FileScan parquet 
default.store[store_key#155,bu_key#156,store_id#157,store_code#159,store_name#160,store_other_name#161,store_format_code#162,store_format_name#163,address_1#166,address_2#167,address_5#170,web_store_flag#184,comp_store_flag#188,h1_l2_hierarchy_name#193,h1_l5_hierarchy_name#199]
 Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<store_key:decimal(38,0),bu_key:decimal(38,0),store_id:string,store_code:string,store_name:...
   :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
string, true]))
   :     :        +- HiveTableScan [loc_idnt#521, latitude#522, longitude#523], 
HiveTableRelation `default`.`geo_tableau`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [loc_idnt#521, 
latitude#522, longitude#523]
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, 
true], input[0, string, true]))
   :        +- *(8) HashAggregate(keys=[fiscal_year#234, 
fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, 
vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266], 
functions=[])
   :           +- Exchange hashpartitioning(fiscal_year#234, 
fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, 
vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266, 200)
   :              +- *(7) HashAggregate(keys=[fiscal_year#234, 
fiscal_week_of_year#222, fiscal_week_start_date#220, fiscal_week_end_date#221, 
vs_max_sales_year#270, vs_max_sales_month#267, vs_max_sales_week#266], 
functions=[])
   :                 +- *(7) FileScan parquet 
default.bl_business_date[FISCAL_WEEK_START_DATE#220,FISCAL_WEEK_END_DATE#221,FISCAL_WEEK_OF_YEAR#222,FISCAL_YEAR#234,vs_max_sales_week#266,vs_max_sales_month#267,vs_max_sales_year#270]
 Batched: true, Format: Parquet, Location: 
InMemoryFileIndex[wasb://utgsj3-asw-prod-ea-hdi-2018-11-23t12-04-25-2...@aswprodeasparkstorage.bl...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<FISCAL_WEEK_START_DATE:string,FISCAL_WEEK_END_DATE:string,FISCAL_WEEK_OF_YEAR:string,FISCA...
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
decimal(38,0), true]))
      +- *(9) FileScan parquet 
rkrdmx.d_rk_business_unit[BU_KEY#271,BU_ID#272,BU_NAME#273] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[wasbs://hiv...@aswprodeaskrkrdmx.blob.core.windows.net/hive/warehouse/rkrdmx.db...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<BU_KEY:decimal(38,0),BU_ID:string,BU_NAME:string>
​
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to