Re: spark df.write.partitionBy run very slow

2019-03-11 Thread JF Chen
Hi
Finally I found the reason...
It caused by some long time gc on some datanodes. After receiving the data
from executors, the data node with long gc cannot report blocks to
namenode, so the writing progress takes a long time.
Now I have decommissioned the broken data nodes, and now my spark runs
well.
I am trying to increase the heap size of data node to check if it can
resolve the problem

Regard,
Junfeng Chen


On Fri, Mar 8, 2019 at 8:54 PM Shyam P  wrote:

> Did you check this , how many portions and count of records it shoes ?
>
> //count by partition_id
> import org.apache.spark.sql.functions.spark_partition_id
> df.groupBy(spark_partition_id).count.show()
>
>
>
> Are you getting same number of parquet files ?
>
> You gradually increase the sample size.
>
> On Fri, 8 Mar 2019, 14:17 JF Chen,  wrote:
>
>> I check my partitionBy method again, it's partitionBy(appname, year,
>> month, day, hour), and the number of partitions of appname is much more
>> than partition of year, month, day, and hour. My spark streaming app runs
>> every 5 minutes, so year, month, day, and hour should be same in most of
>> time.
>> So will the number of appname pattition affect the writing efficiency?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen  wrote:
>>
>>> Yes, I agree.
>>>
>>> From the spark UI I can ensure data is not skewed. There is only about
>>> 100MB for each task, where most of tasks takes several seconds to write the
>>> data to hdfs, and some tasks takes minutes of time.
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P  wrote:
>>>
 Hi JF,
 Yes first we should know actual number of partitions dataframe has and
 its counts of records. Accordingly we should try to have data evenly in all
 partitions.
 It always better to have Num of paritions = N * Num of executors.


   "But the sequence of columns in  partitionBy  decides the
 directory  hierarchy structure. I hope the sequence of columns not change"
 , this is correct.
 Hence sometimes we should go with bigger number first then lesser 
 try this ..i.e. more parent directories and less child directories. Tweet
 around it and try.

 "some tasks in write hdfs stage cost much more time than others" may be
 data is skewed, need to  distrube them evenly for all partitions.

 ~Shyam

 On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:

> Hi Shyam
> Thanks for your reply.
> You mean after knowing the partition number of column_a, column_b,
> column_c, the sequence of column in partitionBy should be same to the 
> order
> of partitions number of column a, b and c?
> But the sequence of columns in  partitionBy  decides the
> directory  hierarchy structure. I hope the sequence of columns not change.
>
> And I found one more strange things, some tasks in write hdfs stage
> cost much more time than others, where the amount of writing data is
> similar. How to solve it?
>
> Regard,
> Junfeng Chen
>
>
> On Tue, Mar 5, 2019 at 3:05 PM Shyam P 
> wrote:
>
>> Hi JF ,
>>  Try to execute it before df.write
>>
>> //count by partition_id
>> import org.apache.spark.sql.functions.spark_partition_id
>> df.groupBy(spark_partition_id).count.show()
>>
>> You will come to know how data has been partitioned inside df.
>>
>> Small trick we can apply here while partitionBy(column_a, column_b,
>> column_c)
>> Makes sure  you should have ( column_a  partitions) > ( column_b
>> partitions) >  ( column_c  partitions) .
>>
>> Try this.
>>
>> Regards,
>> Shyam
>>
>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>>
>>> I am trying to write data in dataset to hdfs via df.write.
>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>> However, it costs several minutes to write only hundreds of MB data
>>> to hdfs.
>>> From this article
>>> ,
>>> adding repartition method before write should work. But if there is
>>> data skew, some tasks may cost much longer time than average, which 
>>> still
>>> cost much time.
>>> How to solve this problem? Thanks in advance !
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>


read json and write into parquet in executors

2019-03-11 Thread Lian Jiang
Hi,

In my spark batch job,

step 1: the driver assigns a partition of json file path list to each
executor.
step 2: each executor gets these assigned json files from S3 and save into
hdfs.
step 3: the driver read these json files into a data frame and save into
parquet.

To improve performance by avoiding writing jsons to hdfs, I want to change
the workflow to:

step 1: the driver assigns a partition of json file path list to each
executor.
step 2: each executor gets these assigned json files from S3, merge the
json content in memory and directly write to parquet. No need to write
jsons to hdfs.

I cannot create dataframes in executors. Is this improvement feasible?
Appreciate any help!


unsubscribe

2019-03-11 Thread Byron Lee



returning type of function that needs to be passed to method 'mapWithState'

2019-03-11 Thread shicheng31...@gmail.com
Hi all:
In the `mapWithState`method in spark streaming, you need to pass in an 
anonymous function. This function maintains a state and should return a  
result. It can be said that the final stateful result can be obtained from the 
state object.
So, what is the significance of returning result? 
I looked up the official API, and it  did not specifically say that the 
result is used for,just give a simple explanation, as follows:



// A mapping function that maintains an integer state and return a String
def mappingFunction(key: String, value: Option[Int], state: State[Int]): 
Option[String] = {
  // Use state.exists(), state.get(), state.update() and state.remove()
  // to manage state, and return the necessary string
}
val spec = StateSpec.function(mappingFunction).numPartitions(10)
val mapWithStateDStream = keyValueDStream.mapWithState[StateType, 
MappedType](spec)
   Can anyone help me with this problem?Thanks!



   


shicheng31...@gmail.com