But now I have another question, how to determine which data node the spark task is writing? It's really important for diving in the problem .
Regard, Junfeng Chen On Thu, Mar 14, 2019 at 2:26 PM Shyam P <shyamabigd...@gmail.com> wrote: > cool. > > On Tue, Mar 12, 2019 at 9:08 AM JF Chen <darou...@gmail.com> wrote: > >> Hi >> Finally I found the reason... >> It caused by some long time gc on some datanodes. After receiving the >> data from executors, the data node with long gc cannot report blocks to >> namenode, so the writing progress takes a long time. >> Now I have decommissioned the broken data nodes, and now my spark runs >> well. >> I am trying to increase the heap size of data node to check if it can >> resolve the problem >> >> Regard, >> Junfeng Chen >> >> >> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <shyamabigd...@gmail.com> wrote: >> >>> Did you check this , how many portions and count of records it shoes ? >>> >>> //count by partition_id >>> import org.apache.spark.sql.functions.spark_partition_id >>> df.groupBy(spark_partition_id).count.show() >>> >>> >>> >>> Are you getting same number of parquet files ? >>> >>> You gradually increase the sample size. >>> >>> On Fri, 8 Mar 2019, 14:17 JF Chen, <darou...@gmail.com> wrote: >>> >>>> I check my partitionBy method again, it's partitionBy(appname, year, >>>> month, day, hour), and the number of partitions of appname is much more >>>> than partition of year, month, day, and hour. My spark streaming app runs >>>> every 5 minutes, so year, month, day, and hour should be same in most of >>>> time. >>>> So will the number of appname pattition affect the writing efficiency? >>>> >>>> Regard, >>>> Junfeng Chen >>>> >>>> >>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darou...@gmail.com> wrote: >>>> >>>>> Yes, I agree. >>>>> >>>>> From the spark UI I can ensure data is not skewed. There is only about >>>>> 100MB for each task, where most of tasks takes several seconds to write >>>>> the >>>>> data to hdfs, and some tasks takes minutes of time. >>>>> >>>>> Regard, >>>>> Junfeng Chen >>>>> >>>>> >>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigd...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi JF, >>>>>> Yes first we should know actual number of partitions dataframe has >>>>>> and its counts of records. Accordingly we should try to have data evenly >>>>>> in >>>>>> all partitions. >>>>>> It always better to have Num of paritions = N * Num of executors. >>>>>> >>>>>> >>>>>> "But the sequence of columns in partitionBy decides the >>>>>> directory hierarchy structure. I hope the sequence of columns not >>>>>> change" >>>>>> , this is correct. >>>>>> Hence sometimes we should go with bigger number first then lesser >>>>>> .... try this ..i.e. more parent directories and less child directories. >>>>>> Tweet around it and try. >>>>>> >>>>>> "some tasks in write hdfs stage cost much more time than others" may >>>>>> be data is skewed, need to distrube them evenly for all partitions. >>>>>> >>>>>> ~Shyam >>>>>> >>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darou...@gmail.com> wrote: >>>>>> >>>>>>> Hi Shyam >>>>>>> Thanks for your reply. >>>>>>> You mean after knowing the partition number of column_a, column_b, >>>>>>> column_c, the sequence of column in partitionBy should be same to the >>>>>>> order >>>>>>> of partitions number of column a, b and c? >>>>>>> But the sequence of columns in partitionBy decides the >>>>>>> directory hierarchy structure. I hope the sequence of columns not >>>>>>> change. >>>>>>> >>>>>>> And I found one more strange things, some tasks in write hdfs stage >>>>>>> cost much more time than others, where the amount of writing data is >>>>>>> similar. How to solve it? >>>>>>> >>>>>>> Regard, >>>>>>> Junfeng Chen >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigd...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi JF , >>>>>>>> Try to execute it before df.write.... >>>>>>>> >>>>>>>> //count by partition_id >>>>>>>> import org.apache.spark.sql.functions.spark_partition_id >>>>>>>> df.groupBy(spark_partition_id).count.show() >>>>>>>> >>>>>>>> You will come to know how data has been partitioned inside df. >>>>>>>> >>>>>>>> Small trick we can apply here while partitionBy(column_a, >>>>>>>> column_b, column_c) >>>>>>>> Makes sure you should have ( column_a partitions) > ( column_b >>>>>>>> partitions) > ( column_c partitions) . >>>>>>>> >>>>>>>> Try this. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Shyam >>>>>>>> >>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darou...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I am trying to write data in dataset to hdfs via df.write. >>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path) >>>>>>>>> However, it costs several minutes to write only hundreds of MB >>>>>>>>> data to hdfs. >>>>>>>>> From this article >>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>, >>>>>>>>> adding repartition method before write should work. But if there >>>>>>>>> is data skew, some tasks may cost much longer time than average, which >>>>>>>>> still cost much time. >>>>>>>>> How to solve this problem? Thanks in advance ! >>>>>>>>> >>>>>>>>> >>>>>>>>> Regard, >>>>>>>>> Junfeng Chen >>>>>>>>> >>>>>>>>