But now I have another question, how to determine which data node the spark
task is writing? It's really important for diving in the problem .

Regard,
Junfeng Chen


On Thu, Mar 14, 2019 at 2:26 PM Shyam P <shyamabigd...@gmail.com> wrote:

> cool.
>
> On Tue, Mar 12, 2019 at 9:08 AM JF Chen <darou...@gmail.com> wrote:
>
>> Hi
>> Finally I found the reason...
>> It caused by some long time gc on some datanodes. After receiving the
>> data from executors, the data node with long gc cannot report blocks to
>> namenode, so the writing progress takes a long time.
>> Now I have decommissioned the broken data nodes, and now my spark runs
>> well.
>> I am trying to increase the heap size of data node to check if it can
>> resolve the problem
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <shyamabigd...@gmail.com> wrote:
>>
>>> Did you check this , how many portions and count of records it shoes ?
>>>
>>> //count by partition_id
>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>         df.groupBy(spark_partition_id).count.show()
>>>
>>>
>>>
>>> Are you getting same number of parquet files ?
>>>
>>> You gradually increase the sample size.
>>>
>>> On Fri, 8 Mar 2019, 14:17 JF Chen, <darou...@gmail.com> wrote:
>>>
>>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>>> month, day, hour), and the number of partitions of appname is much more
>>>> than partition of year, month, day, and hour. My spark streaming app runs
>>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>>> time.
>>>> So will the number of appname pattition affect the writing efficiency?
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darou...@gmail.com> wrote:
>>>>
>>>>> Yes, I agree.
>>>>>
>>>>> From the spark UI I can ensure data is not skewed. There is only about
>>>>> 100MB for each task, where most of tasks takes several seconds to write 
>>>>> the
>>>>> data to hdfs, and some tasks takes minutes of time.
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigd...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi JF,
>>>>>> Yes first we should know actual number of partitions dataframe has
>>>>>> and its counts of records. Accordingly we should try to have data evenly 
>>>>>> in
>>>>>> all partitions.
>>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>>
>>>>>>
>>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>>> directory  hierarchy structure. I hope the sequence of columns not 
>>>>>> change"
>>>>>> , this is correct.
>>>>>> Hence sometimes we should go with bigger number first then lesser
>>>>>> .... try this ..i.e. more parent directories and less child directories.
>>>>>> Tweet around it and try.
>>>>>>
>>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>>
>>>>>> ~Shyam
>>>>>>
>>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Shyam
>>>>>>> Thanks for your reply.
>>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>>> column_c, the sequence of column in partitionBy should be same to the 
>>>>>>> order
>>>>>>> of partitions number of column a, b and c?
>>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>>> directory  hierarchy structure. I hope the sequence of columns not 
>>>>>>> change.
>>>>>>>
>>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>>> cost much more time than others, where the amount of writing data is
>>>>>>> similar. How to solve it?
>>>>>>>
>>>>>>> Regard,
>>>>>>> Junfeng Chen
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigd...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi JF ,
>>>>>>>>  Try to execute it before df.write....
>>>>>>>>
>>>>>>>> //count by partition_id
>>>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>>>
>>>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>>>
>>>>>>>> Small trick we can apply here while partitionBy(column_a,
>>>>>>>> column_b, column_c)
>>>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>>>
>>>>>>>> Try this.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Shyam
>>>>>>>>
>>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darou...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>>>> However, it costs several minutes to write only hundreds of MB
>>>>>>>>> data to hdfs.
>>>>>>>>> From this article
>>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>>>> adding repartition method before write should work. But if there
>>>>>>>>> is data skew, some tasks may cost much longer time than average, which
>>>>>>>>> still cost much time.
>>>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regard,
>>>>>>>>> Junfeng Chen
>>>>>>>>>
>>>>>>>>

Reply via email to