Hi,

Thank you both for your suggestions!  These have been eyeopeners for me.

Just to clarify, I need the counts for logging and auditing purposes
otherwise I would exclude the step.  I should have also mentioned that
while I am processing around 30 GB of raw data, the individual outputs are
relatively small - in total across all files around 30 MB.

Firstly, I have identified the issue and it has nothing to do with Spark.
As part of my testing regime, I do a large file copy operation to stage
data (copying around 6 GB from one directory to the next).  Once that is
done, I kick off my process.  Looking at Cockpit, I noticed that after the
copy command had completed, there was a good 60+ seconds of intensive disk
IO.  In previous testing, this IO was still occurring when the first stages
of my script was running.  I've now extensively re-tested after letting
this IO drop off and I am no longer getting these freezes.  In reality
there is any benefit in the real world (I.E., waiting 90 seconds to save 60
seconds....), but at least I can explain why.

David, by far your suggestion of reducing the shuffle partitions has
absolutely smashed the run times out of the park.  I've reduced the
shuffles down to the configured number of workers (in this case 6) and I am
seeing another 20% of my run times.  I have now hit a firm bottleneck
around network and getting the data from the database server (there was not
much difference between 30 shuffle partitions compared to 6).

Enrico, I have tried your suggestions and I can see some wins as well.  I
have to re-design and rebuild some of my solution to get them to work.
When this project was started, I was asked to provide single partitioned
parquet files (in the same sort of way you would see being outputted by
Pandas) and so my solution has been built around that.  By partitioning on
a field means that I can't deliver in this way.  Regardless, reading in the
parquet at the end, even with a filter clause in the statement, appears to
be much quicker than reading from the data frames.  (I now need to try and
convince the other stakeholders in the project that delivering files how
Spark intended is the correct method)

Thank you both for your input.

Best regards
Ashley



On Fri, Feb 14, 2020 at 4:44 AM Enrico Minack <m...@enrico.minack.dev>
wrote:

> Ashley,
>
> I want to suggest a few optimizations. The problem might go away but at
> least performance should improve.
> The freeze problems could have many reasons, the Spark UI SQL pages and
> stages detail pages would be useful. You can send them privately, if you
> wish.
>
> 1. the repartition(1) should be replaced by coalesce(1). The former will
> shuffle all data, while the latter will read in the existing partitions and
> not shuffle them again.
> 2. Repartitioning to a single partition is discouraged, unless you can
> guarantee the data fit into one worker's memory.
> 3. You can compute Insert and Update in one go, so that you don't have to
> join with df_reference twice.
>
> df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), 
> pk_list, how="left") \
>                              .withColumn('_action', 
> when(col('b.hashkey').isNull, 'Insert').otherwise(col('a.hashkey') != 
> col('b.hashkey'), 'Update')) \
>                              .select(col('_action'), *df_source_hashed) \
>                              .dropDuplicates() \
>                              .cache()
>
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:
>
> inserts_count = df_actions.where(col('_action') === 'Insert').count()
> updates_count = df_actions.where(col('_action') === 'Update').count()
>
> And you can get rid of the union:
>
> df_output = df_actions.where(col('_action').isNotNull)
>
> If you have to write that output to parquet anyway, then you can get the
> count quickly from the parquet file if it is partitioned by the _action
> column (Spark then only looks into parquet's metadata to get the count, it
> does not read any row):
>
> df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
> df_output = sql_context.read.parquet('/path/to/output.parquet')inserts_count 
> = df_output.where(col('_action') === 'Insert').count()
> updates_count = df_output.where(col('_action') === 'Update').count()
>
> These are all just sketches, but I am sure you get the idea.
>
> Enrico
>
>
> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>
> Hi,
>
> I am currently working on an app using PySpark to produce an insert and
> update daily delta capture, being outputted as Parquet.  This is running on
> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
> 2GB memory each) running Spark 2.4.3.
>
> This is being achieved by reading in data from a TSQL database, into a
> dataframe, which has a hash of all records appended to it and comparing it
> to a dataframe from yesterdays data (which has been saved also as
> parquet).
>
> As part of the monitoring and logging, I am trying to count the number of
> records for the respective actions.  Example code:
>
> df_source = spark_session.read.format('jdbc').....
> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>
> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
> *df_source.columns))) \
>             .cache()
>
> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>                     .select(lit('Insert').alias('_action'), 
> *df_source_hashed) \
>                     .dropDuplicates() \
>                     .cache()
> inserts_count = df_inserts.count()
>
> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
> pk_list, how="inner") \
>                         .select(lit('Update').alias('_action'), 
> *df_source_hashed) \
>                         .where(col('a.hashkey') != col('b.hashkey')) \
>                         .dropDuplicates() \
>                         .cache()
> updates_count = df_updates.count()
>
> df_output = df_inserts.union(df_updates)
>
> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>
> The above code is running two occurrences concurrently via Python
> threading.Thread (this is to try and overcome the network bottle neck
> connecting to the database server).
>
> What I am finding is I am getting some very inconsistent behavior with the
> counts.  Occasionally, it appears that it will freeze up on a count
> operation for a few minutes and quite often that specific data frame will
> have zero records in it.  According to the DAG (which I am not 100% sure
> how to read) the following is the processing flow:
>
> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
> WholeStageCodegen/MapPartitionsRDD [75]count at
> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
> [81]count at NativeMethodAccessorImpl.java:0
>
> The other observation I have found that if I remove the counts from the
> data frame operations and instead open the outputted parquet field and
> count using a
> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
> "Insert").count()` command, I am reducing my run-times by around 20 to
> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
> counter-intuitive.
>
> Is anyone able to give me some guidance on why or how to ensure that I am
> doing the above as efficiently as possible?
>
> Best Regards
> Ashley
>
>
>

-- 
Kustoms On Silver <https://www.facebook.com/KustomsOnSilver>

Reply via email to