you can check the "Executors" tab in the spark UI screen... On Fri, Mar 15, 2019 at 7:56 AM JF Chen <darou...@gmail.com> wrote:
> But now I have another question, how to determine which data node the > spark task is writing? It's really important for diving in the problem . > > Regard, > Junfeng Chen > > > On Thu, Mar 14, 2019 at 2:26 PM Shyam P <shyamabigd...@gmail.com> wrote: > >> cool. >> >> On Tue, Mar 12, 2019 at 9:08 AM JF Chen <darou...@gmail.com> wrote: >> >>> Hi >>> Finally I found the reason... >>> It caused by some long time gc on some datanodes. After receiving the >>> data from executors, the data node with long gc cannot report blocks to >>> namenode, so the writing progress takes a long time. >>> Now I have decommissioned the broken data nodes, and now my spark runs >>> well. >>> I am trying to increase the heap size of data node to check if it can >>> resolve the problem >>> >>> Regard, >>> Junfeng Chen >>> >>> >>> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <shyamabigd...@gmail.com> wrote: >>> >>>> Did you check this , how many portions and count of records it shoes ? >>>> >>>> //count by partition_id >>>> import org.apache.spark.sql.functions.spark_partition_id >>>> df.groupBy(spark_partition_id).count.show() >>>> >>>> >>>> >>>> Are you getting same number of parquet files ? >>>> >>>> You gradually increase the sample size. >>>> >>>> On Fri, 8 Mar 2019, 14:17 JF Chen, <darou...@gmail.com> wrote: >>>> >>>>> I check my partitionBy method again, it's partitionBy(appname, year, >>>>> month, day, hour), and the number of partitions of appname is much more >>>>> than partition of year, month, day, and hour. My spark streaming app runs >>>>> every 5 minutes, so year, month, day, and hour should be same in most of >>>>> time. >>>>> So will the number of appname pattition affect the writing efficiency? >>>>> >>>>> Regard, >>>>> Junfeng Chen >>>>> >>>>> >>>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darou...@gmail.com> wrote: >>>>> >>>>>> Yes, I agree. >>>>>> >>>>>> From the spark UI I can ensure data is not skewed. There is only >>>>>> about 100MB for each task, where most of tasks takes several seconds to >>>>>> write the data to hdfs, and some tasks takes minutes of time. >>>>>> >>>>>> Regard, >>>>>> Junfeng Chen >>>>>> >>>>>> >>>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigd...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi JF, >>>>>>> Yes first we should know actual number of partitions dataframe has >>>>>>> and its counts of records. Accordingly we should try to have data >>>>>>> evenly in >>>>>>> all partitions. >>>>>>> It always better to have Num of paritions = N * Num of executors. >>>>>>> >>>>>>> >>>>>>> "But the sequence of columns in partitionBy decides the >>>>>>> directory hierarchy structure. I hope the sequence of columns not >>>>>>> change" >>>>>>> , this is correct. >>>>>>> Hence sometimes we should go with bigger number first then lesser >>>>>>> .... try this ..i.e. more parent directories and less child directories. >>>>>>> Tweet around it and try. >>>>>>> >>>>>>> "some tasks in write hdfs stage cost much more time than others" may >>>>>>> be data is skewed, need to distrube them evenly for all partitions. >>>>>>> >>>>>>> ~Shyam >>>>>>> >>>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darou...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Shyam >>>>>>>> Thanks for your reply. >>>>>>>> You mean after knowing the partition number of column_a, column_b, >>>>>>>> column_c, the sequence of column in partitionBy should be same to the >>>>>>>> order >>>>>>>> of partitions number of column a, b and c? >>>>>>>> But the sequence of columns in partitionBy decides the >>>>>>>> directory hierarchy structure. I hope the sequence of columns not >>>>>>>> change. >>>>>>>> >>>>>>>> And I found one more strange things, some tasks in write hdfs stage >>>>>>>> cost much more time than others, where the amount of writing data is >>>>>>>> similar. How to solve it? >>>>>>>> >>>>>>>> Regard, >>>>>>>> Junfeng Chen >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigd...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi JF , >>>>>>>>> Try to execute it before df.write.... >>>>>>>>> >>>>>>>>> //count by partition_id >>>>>>>>> import org.apache.spark.sql.functions.spark_partition_id >>>>>>>>> df.groupBy(spark_partition_id).count.show() >>>>>>>>> >>>>>>>>> You will come to know how data has been partitioned inside df. >>>>>>>>> >>>>>>>>> Small trick we can apply here while partitionBy(column_a, >>>>>>>>> column_b, column_c) >>>>>>>>> Makes sure you should have ( column_a partitions) > ( column_b >>>>>>>>> partitions) > ( column_c partitions) . >>>>>>>>> >>>>>>>>> Try this. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Shyam >>>>>>>>> >>>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darou...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I am trying to write data in dataset to hdfs via df.write. >>>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path) >>>>>>>>>> However, it costs several minutes to write only hundreds of MB >>>>>>>>>> data to hdfs. >>>>>>>>>> From this article >>>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>, >>>>>>>>>> adding repartition method before write should work. But if there >>>>>>>>>> is data skew, some tasks may cost much longer time than average, >>>>>>>>>> which >>>>>>>>>> still cost much time. >>>>>>>>>> How to solve this problem? Thanks in advance ! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regard, >>>>>>>>>> Junfeng Chen >>>>>>>>>> >>>>>>>>>