Hi Karthick, The problem seems to be that you were performing transformation/recipe on three data frames without materialisation, then writing back to that target table. Each MERGE re-evaluated its “recipe” at a different time, so they saw different snapshots → flaky/empty results.
Fix (short + sweet): 1. Freeze joined_df and each final DF *before any writes* (e.g., persist(); count() or localCheckpoint(eager=True)). 2. Or write the three sources to temp tables first, then MERGE from those This example below is built on Parquet but shows the concept # 0) Start Spark & clean a folder from pyspark.sql import SparkSession, functions as F import shutil spark = SparkSession.builder.appName("FreezeDemo-Parquet").getOrCreate() spark.sparkContext.setLogLevel("ERROR") path = "/tmp/demo_table_parquet" shutil.rmtree(path, ignore_errors=True) # 1) Seed data spark.createDataFrame([(1,"A"), (2,"B")], "id INT, val STRING") \ .write.mode("overwrite").parquet(path) # 2) Build DF BEFORE first append (will NOT see id=3 because Parquet path listing was captured) df1 = spark.read.parquet(path).filter(F.col("id") > 0) print("== Old DF (built before append) — no id=3 ==") df1.show() # 3) Append id=3 and build a NEW DF (this one sees 1,2,3) spark.createDataFrame([(3,"C")], "id INT, val STRING") \ .write.mode("append").parquet(path) df2 = spark.read.parquet(path).filter(F.col("id") > 0) print("== New DF (after append) — has id=3 ==") df2.show() # 4) HARD FREEZE df2 BEFORE any further appends spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints") # any writable path df2_frozen = df2.localCheckpoint(eager=True) # 5) Now append id=4 AFTER freezing spark.createDataFrame([(4,"D")], "id INT, val STRING") \ .write.mode("append").parquet(path) print("== Frozen snapshot (should NOT include id=4) ==") df2_frozen.show() # expect only 1,2,3 # Optional: prove live data now has 1,2,3,4 print("== Fresh read (should include id=4) ==") spark.read.parquet(path).orderBy("id").show() Output ( Running Spark version 3.5.5) == Old DF (built before append) — no id=3 == +---+---+ | id|val| +---+---+ | 1| A| | 2| B| +---+---+ == New DF (after append) — has id=3 == +---+---+ | id|val| +---+---+ | 1| A| | 3| C| | 2| B| +---+---+ == Frozen snapshot (should NOT include id=4) == +---+---+ | id|val| +---+---+ | 1| A| | 3| C| | 2| B| +---+---+ == Fresh read (should include id=4) == +---+---+ | id|val| +---+---+ | 1| A| | 2| B| | 3| C| | 4| D| +---+---+ HTH Dr Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Mon, 11 Aug 2025 at 08:12, Karthick N <kcekarth...@gmail.com> wrote: > Hi *Ángel*, > > Thank you for checking on this. I’ll review the points you mentioned and > get back to you with an update. > > Hi *Mich*, > Looping you in here — could you please assist in reviewing this issue and > share your inputs or suggestions? Your expertise would be really helpful in > resolving it. > > Thanks in advance. > > > On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua < > angel.alvarez.pas...@gmail.com> wrote: > >> Have you tried disabling AQE? >> >> >> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com> escribió: >> >>> Hi Team, >>> >>> I’m facing an issue with the execution order in the PySpark code snippet >>> below. I’m not certain whether it’s caused by lazy evaluation, Spark plan >>> optimization, or something else. >>> >>> *Issue:* >>> For the same data and scenario, during some runs, one of the final views >>> is not returning any data. This appears to be due to changes in the >>> execution order, which in turn affects the final result. In the final >>> steps, we have three different DataFrames derived from the same base >>> DataFrame, and I’m not sure if this could be the cause. >>> >>> I tried using the persist option to hold intermediate results and avoid >>> potential lazy evaluation issues, but the problem still persists. >>> >>> Could you please review this issue and suggest a solution to ensure >>> consistent execution order and results? >>> >>> *Note:* Please let me know if you need any clarification or additional >>> information on this. >>> >>> Code: >>> source_df = spark.sql(f""" >>> SELECT * FROM {catalog_name}.{db_name}.{table_name >>> }""") # Sample source query >>> >>> data_df = source_df.persist() >>> >>> # COMMAND ---------- >>> >>> type2_columns = [ >>> ] >>> data_df = updateRowHashValues(data_df, type2_columns, primary_key) >>> >>> # COMMAND ---------- >>> >>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}" >>> ).filter(col("IsCurrent") == True) >>> target_col_list = target_df.columns >>> source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"), >>> on="PatientId", how="left") >>> joined_df = source_with_target_df.persist() >>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey" >>> ).isNull()).select([col("src." + c).alias(c) for c in data_df >>> .columns]).drop("SourceTimeStamp") >>> >>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy" >>> )).contains("migration")) | (lower(col("ModifiedBy")).contains( >>> "migrated")) | (lower(col("CreatedBy")).contains("migration")) | >>> (lower(col("CreatedBy")).contains("migrated"))) >>> >>> new_records_source_df = filtered_joined_df.alias("jdf").join( >>> new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId" >>> ),how="left_anti").select([col("jdf." + c).alias(c) for c in >>> filtered_joined_df.columns]).drop('SourceTimeStamp') >>> >>> # COMMAND ---------- >>> >>> if is_check_banfield_data and (not new_records_df.isEmpty()): #This >>> is_check_banfield_data may get change based on the environment >>> patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient" >>> ).selectExpr("patientid as patient_patientid", "createdate as >>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts >>> as patient_fw_modifiedts").withColumn('patient_patientid', upper( >>> 'patient_patientid')) >>> banfield_patient_df = patient_df.persist() >>> window_spec = Window.partitionBy("patient_patientid").orderBy(col( >>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc()) >>> banfield_patient_df = banfield_patient_df.withColumn("row_num", >>> row_number().over(window_spec)) >>> banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1 >>> ).drop("row_num") >>> new_records_df = new_records_df.alias("new").join( >>> banfield_patient_df.alias("pat"),col("new.PatientId") == col( >>> "pat.patient_patientid"),how="left") >>> new_records_df = new_records_df.withColumn( >>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"), >>> col("BusinessEffectiveStartDate"))).select("new.*", >>> "BusinessEffectiveStartDate") >>> incoming_new_df = new_records_df.persist() >>> else: >>> is_check_banfield_data = False >>> >>> # COMMAND ---------- >>> >>> type2_changes = joined_df.filter((col("src.RowHashType2") != col( >>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select( >>> "src.*") >>> type1_changes = joined_df.filter((col("src.RowHashType2") == col( >>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col( >>> "tgt.RowHashType1"))).select("src.*") >>> expired_records = joined_df.filter((col("src.RowHashType2") != col( >>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() ).select(col( >>> "tgt.*"), col("src.BusinessEffectiveStartDate").alias( >>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate", >>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False >>> )).drop("NewBusinessEffectiveEndDate") >>> >>> # COMMAND ---------- >>> >>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}" >>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0 >>> starting_key = max_key + 1 >>> target_col_list = list(set(target_col_list) - {"DwCreatedYear", >>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set >>> (surrogate_key.split(','))) >>> type2_changes = type2_changes.select(target_col_list) >>> if is_check_banfield_data: >>> type2_changes = type2_changes.unionByName(incoming_new_df) >>> patient_df.unpersist() >>> new_records_df.unpersist() >>> type2_changes = type2_changes.unionByName(new_records_source_df) >>> type2_changes = type2_changes.withColumn("IsDataMigrated", >>> when(lower(col("ModifiedBy")).contains("migration") | lower(col( >>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy")).contains( >>> "migration") | lower(col("CreatedBy")).contains("migrated"),True >>> ).otherwise(False)) >>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate", >>> lit("9999-12-31").cast("date")).withColumn("IsCurrent", >>> lit(True)).withColumn(surrogate_key, >>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1) >>> type1_updates_columns = list(set(type1_changes.columns) - set( >>> type2_columns)) >>> type1_updates = type1_changes.select(*type1_updates_columns) >>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{ >>> db_name}_{table_name}") # This are the three final temp views that >>> will be used in the merge statements or inserts. In some run for one of the >>> views we don't getting data. >>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{ >>> db_name}_{table_name}") >>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{ >>> table_name}") >>> >>> # COMMAND ---------- >>> >>> # DBTITLE 1,Type1 column changes update >>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{ >>> db_name}.{table_name} AS tgt >>> USING temp_updates_type1_{db_name}_{table_name} AS src >>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true >>> WHEN MATCHED THEN UPDATE SET >>> tgt.col1 = src.col1, >>> tgt.col2 = src.col2, >>> tgt.col3 = src.col3, >>> tgt.col4 = src.col4, >>> . >>> . >>> . >>> . >>> tgt.RowHashType1 = src.RowHashType1""") >>> print(f"Total no of records updated due to Type1 columns update: { >>> existing_records_update.select('num_updated_rows').collect()[0][0]}") >>> >>> # COMMAND ---------- >>> >>> # DBTITLE 1,Update Expired Record >>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{db_name >>> }.{table_name} AS tgt >>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src >>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true >>> WHEN MATCHED THEN UPDATE SET >>> tgt.IsCurrent = false, >>> tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate, >>> tgt.DwModifiedts = src.DwModifiedts, >>> tgt.DwCreatedYear = year(src.DwModifiedts), >>> tgt.DwCreatedMonth = month(src.DwModifiedts), >>> tgt.DwCreatedDay = day(src.DwModifiedts)""") >>> print(log_message=f"Total no of records marked IsCurrent as False due >>> to type2 columns update: {update_expired_record.select( >>> 'num_updated_rows').collect()[0][0]}") >>> >>> # COMMAND ---------- >>> >>> # DBTITLE 1,Insert new records as type2 value changed and first time >>> data arrival >>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{ >>> db_name}.{table_name} ( >>> col1() >>> values( >>> col1 ) >>> FROM temp_inserts_new_records_{db_name}_{table_name} >>> """) >>> print(log_message=f"Total no of new records inserted: { >>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}") >>> >>> # COMMAND ---------- >>> >>> source_df.unpersist() >>> source_with_target_df.unpersist() >>> >>