Ashley,

I want to suggest a few optimizations. The problem might go away but at least performance should improve. The freeze problems could have many reasons, the Spark UI SQL pages and stages detail pages would be useful. You can send them privately, if you wish.

1. the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again. 2. Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker's memory. 3. You can compute Insert and Update in one go, so that you don't have to join with df_reference twice.

|df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="left") \ .withColumn('|||_action|', when(||||col('b.hashkey')||.isNull, 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'), 'Update')) \| .select(col('_action'), *df_source_hashed) \ .dropDuplicates() \ .cache() |

Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions:

|inserts_count = df_actions|||.where(col('_action') === 
'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 
'Update')|.count()|

And you can get rid of the union:

|df_output = df_actions.where(col('_action').isNotNull) |

If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column (Spark then only looks into parquet's metadata to get the count, it does not read any row):

|df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet') df_output = |||sql_context.read.parquet('|||||/path/to/output.parquet|') |inserts_count = |||df_output|.where(col('_action') === 'Insert').count() updates_count = |||df_output|.where(col('_action') === 'Update').count() |

These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code: |df_source = spark_session.read.format('jdbc')..... df_reference = sql_context.read.parquet('/path/to/reference.parquet') df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \ .cache() df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \ .select(lit('Insert').alias('_action'), *df_source_hashed) \ .dropDuplicates() \ .cache() inserts_count = df_inserts.count() df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \ .select(lit('Update').alias('_action'), *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \ .dropDuplicates() \ .cache() updates_count = df_updates.count() df_output = df_inserts.union(df_updates) df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')| The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley


Reply via email to