[ 
https://issues.apache.org/jira/browse/SPARK-43818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-43818:
--------------------------------
    Language:   (was: English)

> Spark Glue job introduces duplicates while writing a dataframe as file to S3
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-43818
>                 URL: https://issues.apache.org/jira/browse/SPARK-43818
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.1.0
>         Environment: Production
>            Reporter: Umesh Kant
>            Priority: Major
>
> We have AWS Glue (Spark) based ETL framework which processes the data through 
> multiple hops and finally write the dataframe in S3 bucket as parquet files 
> with snappy compression. We have used this framework to process and write 
> data to S3 for 1000+ tables/files and it works fine. But for two of the 
> tables - in memory data frame contains correct records but when data frame 
> gets persisted in S3 as file, it introduces duplicate entries and as the 
> total count remains same duplicate cause missing records as well.
> {*}Data Points{*}:
>  # This happens only for large(wider tables + millions of rows)
>  # When this happens we notice stage failures and retry succeeding but 
> causing duplicates/missing records
> {*}Code Steps{*}:
> |Steps Information|Dataframe|Query / Operation /Action|
> |Query Raw DB & get no of partition ( to  loop one by one)| |select distinct 
> partition_0 FROM  <raw db >.<table name>|
> |Raw DF Query|raw|select SCHDWKID_REF, TASK_REF, LIFECYCLE_REF, TASK_DESC, 
> WHOSE_ENT_NAME, WHOSE_INST_REF, WHOSE_INST_CDE, STENDDAT_STRTDT, 
> STENDDAT_ENDDAT, AGENT_ENT_NAME, AGENT_INST_REF, AGENT_INST_CDE, AGENT_CODE, 
> LOCATION_ENT_NAME, LOCATION_INST_REF, LOCATION_INST_CDE, CASEID_NUMBER, 
> FACE_AMT, TAAR_AMT, AUTH_AMT, TRANSFER_YORN_ENCODE, TRANSFER_YORN_DECODE, 
> TRANSFER_YORN_ELMREF, CASE_YORN_ENCODE, CASE_YORN_DECODE, CASE_YORN_ELMREF, 
> CHANGEID_REF, CNTRCTID_REF, CNTRCTID_NUMBER, KTKDSCID_REF, KWNOFFID_REF, 
> KWNOFFID_CODE, USERID_REF, USERID_CODE, WQUEUEID_REF, WQUEUEID_CODE, 
> STATUS_REF, STATUS_CODE, STATUS_ASAT, LASTUPD_USER, LASTUPD_TERMNO, 
> LASTUPD_PROG, LASTUPD_INFTIM, KWQPRIID_REF, KWQPRIID_CODE, INSURED_NAME, 
> AGENT_NAME, EDM_INGESTED_AT, EDM_INGEST_TIME, PARTITION_0, DELTA_IND, 
> TRANSACT_SEQ from RAW_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK where 
> partition_0= '20230428'|
> |Structured  DF Query|structured|SELECT * FROM 
> RL_LAKE_ORACLE_ORAP12_NYLDPROD60CL.SCHEDULED_WORK WHERE part_num > 0 |
> | | | |
> |Merge DF Generated By joining raw & structured on nks|df_merge|df_merge = 
> structured.join(raw,keys,how='fullouter')|
> |action column will be added to
>  Merge Df|df_merge|df_merge = df_merge.withColumn("action", 
> fn.when((((df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') \| 
> (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D')) & ( 
> df_merge['raw.chksum'].isNull()) & (~ 
> df_merge['structured.CHKSUM'].isNull())) , "NOACTION")
>                                                 
> .when((df_merge['structured.CHKSUM'].isNull()) & (df_merge['raw.delta_ind']!= 
> 'D'), "INSERT")
>                                                 
> .when((df_merge['structured.CHKSUM'] != df_merge['raw.chksum']) & (~ 
> df_merge['structured.CHKSUM'].isNull()) & 
> (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A') & 
> ((df_merge['raw.delta_ind'] == 'U') \| (df_merge['raw.delta_ind'] == 'I')), 
> "UPDATE")
>                                                 
> .when(((df_merge['raw.delta_ind']== 'D') & 
> (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'A')) , "DELETE")
>                                                 
> .when(((df_merge['raw.delta_ind']== 'D') & 
> (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') ) , "DELETECOPY")
>                                                 
> .when(((df_merge['raw.delta_ind']== 'I') & 
> (df_merge['structured.EDH_RECORD_STATUS_IN'] == 'D') & (~ 
> df_merge['raw.chksum'].isNull()) & (~ 
> df_merge['structured.CHKSUM'].isNull())) , "DELETEREINSERT")
>                                                 
> .when(((df_merge['raw.delta_ind']== 'D') & 
> (df_merge['structured.CHKSUM'].isNull())) , "DELETEABSENT")
>                                                 
> .when((df_merge['structured.CHKSUM'] == df_merge['raw.chksum']), "NOCHANGE"))|
> | | | |
> |No Action df will be derived from merge df|df_noaction|df_noaction = 
> df_merge.select(keys + ['structured.' + x.upper() for x in 
> structured_cols_list if x.upper() not in keys]).where((df_merge.action == 
> 'NOACTION') \| (df_merge.action == 'NOCHANGE'))|
> |Delete Copy DF will be derived|df_dcopy|df_dcopy = df_merge.select(keys + 
> ['structured.' + x.upper() for x in structured_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'DELETECOPY')|
> |Delete Absent df will be derived|df_dabs|df_dabs = df_merge.select(keys + 
> ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'DELETEABSENT')|
> |insert df will be derived|df_insert|df_insert = df_merge.select(keys + 
> ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'INSERT')|
> |Outdated Df will be derived , records from structured where we have 
> updates|df_outdated|df_outdated = df_merge.select(keys + ['structured.' + 
> x.upper() for x in structured_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'UPDATE')|
> |Deleted Records will be derived (Inactive)|df_delrecIn|df_delrecIn = 
> df_merge.select(keys + ['structured.' + x.upper() for x in 
> structured_cols_list if x.upper() not in keys]).where(df_merge.action == 
> 'DELETE')|
> |Deleted  Records will be derived (Active)|df_delrecAc|df_delrecAc = 
> df_merge.select(keys + ['structured.' + x.upper() for x in 
> structured_cols_list if x.upper() not in keys]).where(df_merge.action == 
> 'DELETE')|
> |Deleted & Re inserted Records will be derived 
> (Inactive)|df_delReInsertInactive|df_delReInsertInactive = 
> df_merge.select(keys + ['structured.' + x.upper() for x in 
> structured_cols_list if x.upper() not in keys]).where(df_merge.action == 
> 'DELETEREINSERT')|
> |Deleted & Re inserted Records will be derived 
> (active)|df_delReInsertActive|df_delReInsertActive = df_merge.select(keys + 
> ['raw.' + x.upper() for x in raw_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'DELETEREINSERT')|
> |Updated Records df|df_update|df_update = df_merge.select(keys + ['raw.' + 
> x.upper() for x in raw_cols_list if x.upper() not in 
> keys]).where(df_merge.action == 'UPDATE')|
> | | | |
> |Updated df active df by unioning all active df generated 
> previously|df_active|df_active = 
> df_insert.unionAll(df_update).unionAll(df_delrecAc).unionAll(df_delReInsertActive).unionAll(df_dabs)|
> |We will cachec df into Memory|df_active|df_active=df_active.cache()|
> |Writing df_active df to debug tables by storing in df_active_before_index 
> |df_active_before_index |df_active_before_index = df_active|
> |adding sk column values to active 
> df|df_active|df_active=dfZipWithIndex(df_active,skey_last_val,sk_col_nme,'part_num1',int(part_cnt),load_type)|
> |We will cache df into Memory|df_active|df_active = df_active.cache()|
> |created new df with active df from no action & delete copy 
> df|df_active2|df_active2=df_noaction.unionAll(df_dcopy)|
> |union of previously generated active2 & 
> active|df_active|df_active=df_active.unionAll(df_active2)|
> |Updated inactive df |df_inactive_final|df_inactive_final = 
> df_outdated.unionAll(df_delrecIn).unionAll(df_delReInsertInactive)|
> |Adding to debug tables |df_merge,df_active_final,df_inactive_final, 
> raw,structured, df_active2,df_active_before_index|These debug table steps 
> were added to troubleshoot duplicate/missing records issue. The 
> tables(external) can be queried.|
> |Debug tables we can query|scheduled_work_active_bindex, 
> scheduled_work_active , scheduled_work_active2 
> ,scheduled_work_before_write_active , scheduled_work_inactive , 
> scheduled_work_merge, scheduled_work_raw ,scheduled_work_structured |These 
> debug table steps were added to troubleshoot duplicate/missing records issue. 
> The tables(external) can be queried.|
> |change audit data types of active & inactive df | 
> |change_audit_datatype(fhz_df,structured_audit_col_names)|
> |active data set  null replaced with blanks|df|df=df.fillna('')|
> |active data set column renaming | |conf_df=df_col_rename(df, 
> raw_all_col_n_partcol, structured_all_col_names)|
> |For Varchar columns trimming function is applied on active data set| 
> |conf_df = conf_df.withColumn(i[0], fn.trim(conf_df[i[0]]))|
> |Active dataset column renaming | |df = 
> df.withColumnRenamed("PART_NUM","part_num")|
> |repartitioning dataset based on dpus|df|df=df.repartition(no_of_dpu)|
> |Dropping unused columns from active dataset df|df| |
> |delete partitions| |del_partition|
> |delete active files | |del_File(bucket_name,del_path, part_col_cnt)|
> |write active data set(which is same as the data writing to s3 structured 
> path) to debug tables|df (debug table name = 
> scheduled_work_before_write_active)|create_temp_table(df_dict, final_tbl, 
> tbl_path, str_dir_path,'ldap')|
> |Writing active df to structured layer|df |wrt_df|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to