Hi Karthick,

The problem seems to be that you were performing  transformation/recipe on
three data frames without materialisation, then writing back to that target
table. Each MERGE re-evaluated its “recipe” at a different time, so they
saw different snapshots → flaky/empty results.

Fix (short + sweet):

   1. Freeze joined_df and each final DF *before any writes* (e.g., persist();
   count() or localCheckpoint(eager=True)).
   2.

   Or write the three sources to temp tables first, then MERGE from those


This example below is built on Parquet but shows the concept


# 0)  Start Spark & clean a folder
from pyspark.sql import SparkSession, functions as F
import shutil

spark = SparkSession.builder.appName("FreezeDemo-Parquet").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

path = "/tmp/demo_table_parquet"
shutil.rmtree(path, ignore_errors=True)

# 1) Seed data
spark.createDataFrame([(1,"A"), (2,"B")], "id INT, val STRING") \
     .write.mode("overwrite").parquet(path)

# 2) Build DF BEFORE first append (will NOT see id=3 because Parquet path
listing was captured)
df1 = spark.read.parquet(path).filter(F.col("id") > 0)
print("== Old DF (built before append) — no id=3 ==")
df1.show()

# 3) Append id=3 and build a NEW DF (this one sees 1,2,3)
spark.createDataFrame([(3,"C")], "id INT, val STRING") \
     .write.mode("append").parquet(path)
df2 = spark.read.parquet(path).filter(F.col("id") > 0)
print("== New DF (after append) — has id=3 ==")
df2.show()

# 4) HARD FREEZE df2 BEFORE any further appends
spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")  # any
writable path
df2_frozen = df2.localCheckpoint(eager=True)

# 5) Now append id=4 AFTER freezing
spark.createDataFrame([(4,"D")], "id INT, val STRING") \
     .write.mode("append").parquet(path)

print("== Frozen snapshot (should NOT include id=4) ==")
df2_frozen.show()   # expect only 1,2,3

# Optional: prove live data now has 1,2,3,4
print("== Fresh read (should include id=4) ==")
spark.read.parquet(path).orderBy("id").show()

Output ( Running Spark version 3.5.5)

== Old DF (built before append) — no id=3 ==
+---+---+
| id|val|
+---+---+
|  1|  A|
|  2|  B|
+---+---+

== New DF (after append) — has id=3 ==
+---+---+
| id|val|
+---+---+
|  1|  A|
|  3|  C|
|  2|  B|
+---+---+

== Frozen snapshot (should NOT include id=4) ==
+---+---+
| id|val|
+---+---+
|  1|  A|
|  3|  C|
|  2|  B|
+---+---+

== Fresh read (should include id=4) ==
+---+---+
| id|val|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
|  4|  D|
+---+---+

HTH

Dr Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Mon, 11 Aug 2025 at 08:12, Karthick N <kcekarth...@gmail.com> wrote:

> Hi *Ángel*,
>
> Thank you for checking on this. I’ll review the points you mentioned and
> get back to you with an update.
>
> Hi *Mich*,
> Looping you in here — could you please assist in reviewing this issue and
> share your inputs or suggestions? Your expertise would be really helpful in
> resolving it.
>
> Thanks in advance.
>
>
> On Mon, Aug 11, 2025 at 2:02 AM Ángel Álvarez Pascua <
> angel.alvarez.pas...@gmail.com> wrote:
>
>> Have you tried disabling AQE?
>>
>>
>> El dom, 10 ago 2025, 20:48, Karthick N <kcekarth...@gmail.com> escribió:
>>
>>> Hi Team,
>>>
>>> I’m facing an issue with the execution order in the PySpark code snippet
>>> below. I’m not certain whether it’s caused by lazy evaluation, Spark plan
>>> optimization, or something else.
>>>
>>> *Issue:*
>>> For the same data and scenario, during some runs, one of the final views
>>> is not returning any data. This appears to be due to changes in the
>>> execution order, which in turn affects the final result. In the final
>>> steps, we have three different DataFrames derived from the same base
>>> DataFrame, and I’m not sure if this could be the cause.
>>>
>>> I tried using the persist option to hold intermediate results and avoid
>>> potential lazy evaluation issues, but the problem still persists.
>>>
>>> Could you please review this issue and suggest a solution to ensure
>>> consistent execution order and results?
>>>
>>> *Note:* Please let me know if you need any clarification or additional
>>> information on this.
>>>
>>> Code:
>>> source_df = spark.sql(f"""
>>>                       SELECT * FROM {catalog_name}.{db_name}.{table_name
>>> }""")  # Sample source query
>>>
>>> data_df = source_df.persist()
>>>
>>> # COMMAND ----------
>>>
>>> type2_columns = [
>>> ]
>>> data_df = updateRowHashValues(data_df, type2_columns, primary_key)
>>>
>>> # COMMAND ----------
>>>
>>> target_df = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>> ).filter(col("IsCurrent") == True)
>>> target_col_list = target_df.columns
>>> source_with_target_df = data_df.alias("src").join(target_df.alias("tgt"),
>>> on="PatientId", how="left")
>>> joined_df = source_with_target_df.persist()
>>> filtered_joined_df = joined_df.filter(col("tgt.DimTypeIIKey"
>>> ).isNull()).select([col("src." + c).alias(c) for c in data_df
>>> .columns]).drop("SourceTimeStamp")
>>>
>>> new_records_df = filtered_joined_df.filter((lower(col("ModifiedBy"
>>> )).contains("migration")) | (lower(col("ModifiedBy")).contains(
>>> "migrated")) | (lower(col("CreatedBy")).contains("migration")) |
>>> (lower(col("CreatedBy")).contains("migrated")))
>>>
>>> new_records_source_df = filtered_joined_df.alias("jdf").join(
>>> new_records_df.alias("nrd"),col("jdf.PatientId") == col("nrd.PatientId"
>>> ),how="left_anti").select([col("jdf." + c).alias(c) for c in
>>> filtered_joined_df.columns]).drop('SourceTimeStamp')
>>>
>>> # COMMAND ----------
>>>
>>> if is_check_banfield_data and (not new_records_df.isEmpty()):    #This
>>> is_check_banfield_data may get change based on the environment
>>>     patient_df = spark.table(f"{banfield_catalog_name}.bfdw.patient"
>>> ).selectExpr("patientid as patient_patientid", "createdate as
>>> patient_createdate", "changedate as patient_changedate", "fw_modifiedts
>>> as patient_fw_modifiedts").withColumn('patient_patientid', upper(
>>> 'patient_patientid'))
>>>     banfield_patient_df = patient_df.persist()
>>>     window_spec = Window.partitionBy("patient_patientid").orderBy(col(
>>> "patient_changedate").desc(),col("patient_fw_modifiedts").desc())
>>>     banfield_patient_df = banfield_patient_df.withColumn("row_num",
>>> row_number().over(window_spec))
>>>     banfield_patient_df = banfield_patient_df.filter(col("row_num") == 1
>>> ).drop("row_num")
>>>     new_records_df = new_records_df.alias("new").join(
>>> banfield_patient_df.alias("pat"),col("new.PatientId") == col(
>>> "pat.patient_patientid"),how="left")
>>>     new_records_df = new_records_df.withColumn(
>>> "BusinessEffectiveStartDate",coalesce(col("pat.patient_createdate"),
>>> col("BusinessEffectiveStartDate"))).select("new.*",
>>> "BusinessEffectiveStartDate")
>>>     incoming_new_df = new_records_df.persist()
>>> else:
>>>     is_check_banfield_data = False
>>>
>>> # COMMAND ----------
>>>
>>> type2_changes = joined_df.filter((col("src.RowHashType2") != col(
>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull()).select(
>>> "src.*")
>>> type1_changes = joined_df.filter((col("src.RowHashType2") == col(
>>> "tgt.RowHashType2")) & (col("src.RowHashType1") != col(
>>> "tgt.RowHashType1"))).select("src.*")
>>> expired_records = joined_df.filter((col("src.RowHashType2") != col(
>>> "tgt.RowHashType2")) & col("tgt.DimTypeIIKey").isNotNull() ).select(col(
>>> "tgt.*"), col("src.BusinessEffectiveStartDate").alias(
>>> "NewBusinessEffectiveEndDate")).withColumn("BusinessEffectiveEndDate",
>>> col("NewBusinessEffectiveEndDate")).withColumn("IsCurrent", lit(False
>>> )).drop("NewBusinessEffectiveEndDate")
>>>
>>> # COMMAND ----------
>>>
>>> max_key = spark.table(f"{catalog_name}.{db_name}.{table_name}"
>>> ).agg(spark_max(surrogate_key)).collect()[0][0] or 0
>>> starting_key = max_key + 1
>>> target_col_list = list(set(target_col_list) - {"DwCreatedYear",
>>> "DwCreatedMonth", "DwCreatedDay", "IsDataMigrated", "IsCurrent"} - set
>>> (surrogate_key.split(',')))
>>> type2_changes = type2_changes.select(target_col_list)
>>> if is_check_banfield_data:
>>>     type2_changes = type2_changes.unionByName(incoming_new_df)
>>>     patient_df.unpersist()
>>>     new_records_df.unpersist()
>>> type2_changes = type2_changes.unionByName(new_records_source_df)
>>> type2_changes = type2_changes.withColumn("IsDataMigrated",
>>> when(lower(col("ModifiedBy")).contains("migration") | lower(col(
>>> "ModifiedBy")).contains("migrated") | lower(col("CreatedBy")).contains(
>>> "migration") | lower(col("CreatedBy")).contains("migrated"),True
>>> ).otherwise(False))
>>> type2_changes = type2_changes.withColumn("BusinessEffectiveEndDate",
>>> lit("9999-12-31").cast("date")).withColumn("IsCurrent", 
>>> lit(True)).withColumn(surrogate_key,
>>> row_number().over(Window.orderBy("PatientId")) + starting_key - 1)
>>> type1_updates_columns = list(set(type1_changes.columns) - set(
>>> type2_columns))
>>> type1_updates = type1_changes.select(*type1_updates_columns)
>>> expired_records.createOrReplaceTempView(f"temp_updates_type2_expired_{
>>> db_name}_{table_name}")   # This are the three final temp views that
>>> will be used in the merge statements or inserts. In some run for one of the
>>> views we don't getting data.
>>> type2_changes.createOrReplaceTempView(f"temp_inserts_new_records_{
>>> db_name}_{table_name}")
>>> type1_updates.createOrReplaceTempView(f"temp_updates_type1_{db_name}_{
>>> table_name}")
>>>
>>> # COMMAND ----------
>>>
>>> # DBTITLE 1,Type1 column changes update
>>> existing_records_update = spark.sql(f"""MERGE INTO {catalog_name}.{
>>> db_name}.{table_name} AS tgt
>>> USING temp_updates_type1_{db_name}_{table_name} AS src
>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>> WHEN MATCHED THEN UPDATE SET
>>>     tgt.col1 = src.col1,
>>>     tgt.col2 = src.col2,
>>>     tgt.col3 = src.col3,
>>>     tgt.col4 = src.col4,
>>>     .
>>>     .
>>>     .
>>>     .
>>>     tgt.RowHashType1 = src.RowHashType1""")
>>> print(f"Total no of records updated due to Type1 columns update: {
>>> existing_records_update.select('num_updated_rows').collect()[0][0]}")
>>>
>>> # COMMAND ----------
>>>
>>> # DBTITLE 1,Update Expired Record
>>> update_expired_record = spark.sql(f"""MERGE INTO {catalog_name}.{db_name
>>> }.{table_name} AS tgt
>>> USING temp_updates_type2_expired_{db_name}_{table_name} AS src
>>> ON tgt.PatientId = src.PatientId AND tgt.IsCurrent = true
>>> WHEN MATCHED THEN UPDATE SET
>>>     tgt.IsCurrent = false,
>>>     tgt.BusinessEffectiveEndDate = src.BusinessEffectiveEndDate,
>>>     tgt.DwModifiedts = src.DwModifiedts,
>>>     tgt.DwCreatedYear = year(src.DwModifiedts),
>>>     tgt.DwCreatedMonth = month(src.DwModifiedts),
>>>     tgt.DwCreatedDay = day(src.DwModifiedts)""")
>>> print(log_message=f"Total no of records marked IsCurrent as False due
>>> to type2 columns update: {update_expired_record.select(
>>> 'num_updated_rows').collect()[0][0]}")
>>>
>>> # COMMAND ----------
>>>
>>> # DBTITLE 1,Insert new records as type2 value changed and first time
>>> data arrival
>>> new_records_insertion = spark.sql(f"""INSERT INTO {catalog_name}.{
>>> db_name}.{table_name} (
>>>     col1()
>>>     values(
>>>     col1 )
>>> FROM temp_inserts_new_records_{db_name}_{table_name}
>>> """)
>>> print(log_message=f"Total no of new records inserted: {
>>> new_records_insertion.select('num_inserted_rows').collect()[0][0]}")
>>>
>>> # COMMAND ----------
>>>
>>> source_df.unpersist()
>>> source_with_target_df.unpersist()
>>>
>>

Reply via email to