Did you check this , how many portions and count of records it shoes ?

//count by partition_id
        import org.apache.spark.sql.functions.spark_partition_id
        df.groupBy(spark_partition_id).count.show()



Are you getting same number of parquet files ?

You gradually increase the sample size.

On Fri, 8 Mar 2019, 14:17 JF Chen, <darou...@gmail.com> wrote:

> I check my partitionBy method again, it's partitionBy(appname, year,
> month, day, hour), and the number of partitions of appname is much more
> than partition of year, month, day, and hour. My spark streaming app runs
> every 5 minutes, so year, month, day, and hour should be same in most of
> time.
> So will the number of appname pattition affect the writing efficiency?
>
> Regard,
> Junfeng Chen
>
>
> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darou...@gmail.com> wrote:
>
>> Yes, I agree.
>>
>> From the spark UI I can ensure data is not skewed. There is only about
>> 100MB for each task, where most of tasks takes several seconds to write the
>> data to hdfs, and some tasks takes minutes of time.
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigd...@gmail.com> wrote:
>>
>>> Hi JF,
>>> Yes first we should know actual number of partitions dataframe has and
>>> its counts of records. Accordingly we should try to have data evenly in all
>>> partitions.
>>> It always better to have Num of paritions = N * Num of executors.
>>>
>>>
>>>   "But the sequence of columns in  partitionBy  decides the
>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>> , this is correct.
>>> Hence sometimes we should go with bigger number first then lesser ....
>>> try this ..i.e. more parent directories and less child directories. Tweet
>>> around it and try.
>>>
>>> "some tasks in write hdfs stage cost much more time than others" may be
>>> data is skewed, need to  distrube them evenly for all partitions.
>>>
>>> ~Shyam
>>>
>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darou...@gmail.com> wrote:
>>>
>>>> Hi Shyam
>>>> Thanks for your reply.
>>>> You mean after knowing the partition number of column_a, column_b,
>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>> of partitions number of column a, b and c?
>>>> But the sequence of columns in  partitionBy  decides the
>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>
>>>> And I found one more strange things, some tasks in write hdfs stage
>>>> cost much more time than others, where the amount of writing data is
>>>> similar. How to solve it?
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigd...@gmail.com> wrote:
>>>>
>>>>> Hi JF ,
>>>>>  Try to execute it before df.write....
>>>>>
>>>>> //count by partition_id
>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>
>>>>> You will come to know how data has been partitioned inside df.
>>>>>
>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>> column_c)
>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>> partitions) >  ( column_c  partitions) .
>>>>>
>>>>> Try this.
>>>>>
>>>>> Regards,
>>>>> Shyam
>>>>>
>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darou...@gmail.com> wrote:
>>>>>
>>>>>> I am trying to write data in dataset to hdfs via df.write.partitionBy
>>>>>> (column_a, column_b, column_c).parquet(output_path)
>>>>>> However, it costs several minutes to write only hundreds of MB data
>>>>>> to hdfs.
>>>>>> From this article
>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>> adding repartition method before write should work. But if there is
>>>>>> data skew, some tasks may cost much longer time than average, which still
>>>>>> cost much time.
>>>>>> How to solve this problem? Thanks in advance !
>>>>>>
>>>>>>
>>>>>> Regard,
>>>>>> Junfeng Chen
>>>>>>
>>>>>

Reply via email to