hi all
i have a query
```
1 spark.sql("select
distinct cust_id,
cast (b.device_name as varchar(200)) as devc_name_cast,
prmry_reside_cntry_code
from (select * from ${model_db}.crs_recent_30d_SF_dim_cust_info where
dt='${today}') a
join fact_rsk_magnes_txn b on a.cust_id = b.customer_id
where b.device_name <> '#'
and b.device_name is not null
and b.device_name <>'~'").registerTempTable("device_driver_info_0")
2 spark.sql("select
*,
lower(regexp_REPLACE (devc_name_cast, 'â', 'a') ) as devc_name_norm
from device_driver_info_0").registerTempTable("device_driver_info_1")
3 spark.sql("select
cust_id,
devc_name_norm ||'_'|| prmry_reside_cntry_code as Device_Name_Country
from device_driver_info_1 where
dt='${today}'").registerTempTable("device_driver_info")
4 spark.sql("select
cust_id,
Device_Name_Country
from device_driver_info
where Device_Name_Country is not null
group by 1,2").registerTempTable("device_name_SF_final_acct_info")
5 spark.sql("select
Device_Name_Country,
count(distinct cust_id) as cust_cnt
from device_name_SF_final_acct_info
group by 1").registerTempTable("device_count_1")
spark.sql("select * from device_count_1 where cust_cnt between 5 and
5000").registerTempTable("device_count")
6 spark.sql("select
b.cust_id,
cast('Device_Name_Country' as varchar(100)) network_source,
cast(a.Device_Name_Country as varchar(100)) as network_source_value
from device_count a
left join device_name_SF_final_acct_info b
on a.Device_Name_Country=b.Device_Name_Country").write
.mode(SaveMode.Overwrite)
.insertInto(s"$databaseName.$tableName")
```
the problem here is from the logical plan , we can see Device_Name_Country is
composed
by 'devc_name_norm ||'_'|| prmry_reside_cntry_code' in sql#3 but it does not
show in below logic plan so it throws error. I find the sql run successfully
on spark2 while on
spark3.1.2 it has error, please help
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#4030, Device_Name_Country#4099, 3001),
ENSURE_REQUIREMENTS, [id=#2669]
+- *(5) HashAggregate(keys=[cust_id#4030, Device_Name_Country#4099],
functions=[], output=[cust_id#4030, Device_Name_Country#4099])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#4030, devc_name_cast#4029,
prmry_reside_cntry_code#4036, 3001), ENSURE_REQUIREMENTS, [id=#2376]
+- *(3) HashAggregate(keys=[cust_id#4030, devc_name_cast#4029,
prmry_reside_cntry_code#4036], functions=[], output=[cust_id#4030,
devc_name_cast#4029, prmry_reside_cntry_code#4036])
+- *(3) Project [cust_id#4030, device_name#3453 AS devc_name_cast#4029,
prmry_reside_cntry_code#4036]
+- *(3) BroadcastHashJoin [cust_id#4030], [customer_id#3431], Inner, BuildLeft,
isnotnull(concat(concat(lower(regexp_replace(device_name#3453, â, a, 1)), _),
prmry_reside_cntry_code#4036)), false
:- BroadcastQueryStage 0
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
false]),false), [id=#2132]
: +- *(1) Filter isnotnull(cust_id#4030)
: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info
[cust_id#4030, prmry_reside_cntry_code#4036], HiveTableRelation
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#4030,
acct_cre_dt#4031, is_guest_y_n#4032, prmry_email_domain#4033,
cust_first_name#4034..., Partition Cols: [dt#4048], Pruned Partitions:
[(dt=2023-03-23)]], [isnotnull(dt#4048), (dt#4048 = 2023-03-23)]
+- *(3) Project [customer_id#3431, device_name#3453]
+- *(3) Filter (((NOT (device_name#3453 = #) AND isnotnull(device_name#3453))
AND NOT (device_name#3453 = ~)) AND isnotnull(customer_id#3431))
+- *(3) ColumnarToRow
+- FileScan parquet
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#3431,device_name#3453,ts#3603,event_dt#3604]
Batched: true, DataFilters: [NOT (device_name#3453 = #),
isnotnull(device_name#3453), NOT (device_name#3453 = ~), isnotnull(c...,
Format: Parquet, Location:
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
PartitionFilters: [isnotnull(event_dt#3604), (cast(event_dt#3604 as date) >=
19409), (event_dt#3604 <= 2023-03-23)], PushedFilters:
[Not(EqualTo(device_name,#)), IsNotNull(device_name),
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema:
struct
ERROR
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#13555, Device_Name_Country#13624, 3001),
ENSURE_REQUIREMENTS, [id=#23666]
+- *(5) HashAggregate(keys=[cust_id#13555, Device_Name_Country#13624],
functions=[], output=[cust_id#13555, Device_Name_Country#13624])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#13555, devc_n