Re: [pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

2020-06-19 Thread Rishi Shah
Thanks Sean! To combat the skew I do have another column I partitionby and
that has worked well (like below). However in the image I attached in my
original email - it looks like 2 tasks processed nothing, may I
reading SPARKUI task table right? All 4 dates have date - 2 dates have
~200MB & other 2 have ~800MB... This was just a test run to check the
behavior. Shouldn't I see all 4 tasks with some output rows?

df.repartition('file_date',
'part_col').write.partitionBy('file_date').parquet(PATH)


On Fri, Jun 19, 2020 at 9:38 AM Sean Owen  wrote:

> Yes you'll generally get 1 partition per block, and 1 task per partition.
> The amount of RAM isn't directly relevant; it's not loaded into memory.
> But you may nevertheless get some improvement with larger partitions /
> tasks, though typically only if your tasks are very small and very fast
> right now (completing in a few seconds)
> You can use minSplitSize to encourage some RDD APIs to choose larger
> partitions, but not in the DF API.
> Instead you can try coalescing to a smaller number of partitions, without
> a shuffle (the shuffle will probably negate any benefit)
>
> However what I see here is different still -- you have serious data skew
> because you partitioned by date, and I suppose some dates have lots of
> data, some have almost none.
>
>
> On Fri, Jun 19, 2020 at 12:17 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have about 10TB of parquet data on S3, where data files have 128MB
>> sized blocks. Spark would by default pick up one block per task, even
>> though every task within executor has atleast 1.8GB memory. Isn't that
>> wasteful? Is there any way to speed up this processing? Is there a way to
>> force tasks to pick up more files which sum up to a certain block size? or
>> Spark would always entertain block per task? Basically is there an override
>> to make sure spark tasks reads larger block(s)?
>>
>> Also as seen in the image here - while writing 4 files (partitionby
>> file_date), one file per partition.. Somehow 4 threads are active but two
>> threads seem to be doing nothing. and other 2 threads have taken over the
>> writing for all 4 files. Shouldn't all 4 tasks pick up one task each?
>>
>> For this example, assume df has 4 file_dates worth data.
>>
>> df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)
>>
>> Screen Shot 2020-06-18 at 2.01.53 PM.png (126K)
>> 
>>
>> Any suggestions/feedback helps, appreciate it!
>> --
>> Regards,
>>
>> Rishi Shah
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Regards,

Rishi Shah


Re: [pyspark 2.3+] read/write huge data with smaller block size (128MB per block)

2020-06-19 Thread Sean Owen
Yes you'll generally get 1 partition per block, and 1 task per partition.
The amount of RAM isn't directly relevant; it's not loaded into memory. But
you may nevertheless get some improvement with larger partitions / tasks,
though typically only if your tasks are very small and very fast right now
(completing in a few seconds)
You can use minSplitSize to encourage some RDD APIs to choose larger
partitions, but not in the DF API.
Instead you can try coalescing to a smaller number of partitions, without a
shuffle (the shuffle will probably negate any benefit)

However what I see here is different still -- you have serious data skew
because you partitioned by date, and I suppose some dates have lots of
data, some have almost none.


On Fri, Jun 19, 2020 at 12:17 AM Rishi Shah 
wrote:

> Hi All,
>
> I have about 10TB of parquet data on S3, where data files have 128MB sized
> blocks. Spark would by default pick up one block per task, even though
> every task within executor has atleast 1.8GB memory. Isn't that wasteful?
> Is there any way to speed up this processing? Is there a way to force tasks
> to pick up more files which sum up to a certain block size? or Spark would
> always entertain block per task? Basically is there an override to make
> sure spark tasks reads larger block(s)?
>
> Also as seen in the image here - while writing 4 files (partitionby
> file_date), one file per partition.. Somehow 4 threads are active but two
> threads seem to be doing nothing. and other 2 threads have taken over the
> writing for all 4 files. Shouldn't all 4 tasks pick up one task each?
>
> For this example, assume df has 4 file_dates worth data.
>
> df.repartition('file_date').write.partitionBy('file_date').parquet(PATH)
>
> Screen Shot 2020-06-18 at 2.01.53 PM.png (126K)
> 
>
> Any suggestions/feedback helps, appreciate it!
> --
> Regards,
>
> Rishi Shah
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org