Re: physical memory usage keep increasing for spark app on Yarn

2017-02-15 Thread Yang Cao
Hi Pavel!

Sorry for late. I just do some investigation in these days with my colleague. 
Here is my thought: from spark 1.2, we use Netty with off-heap memory to reduce 
GC during shuffle and cache block transfer. In my case, if I try to increase 
the memory overhead enough. I will get the Max direct buffer exception. When 
Netty do block transferring, there will be five threads by default to grab the 
data chunk to target executor. In my situation, one single chunk is too big to 
fit into the buffer. So gc won’t help here. My final solution is to do another 
repartition before the repartition(1). Just to make 10x times more partitions 
than original’s. In this way, I can reduce the size of each chunk Netty 
transfer. 

Also I want to say that it’s not a good choice to repartition a big dataset 
into single file. This extremely unbalanced scenario is kind of waste your 
compute resources. 

I don’t know whether my explanation is right. Plz correct me if you find any 
issue.THX

Best,
Yang
>  On 2017年1月23日, at 18:03, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang!
> 
> I don't know exactly why this happen, but i think GC can't work to fast 
> enough or size of data with additional objects created while computations to 
> big for executor. 
> And i found that this problem only if you make some data manipulations. You 
> can cache you data first, after that, write in one partiton.
> For example  
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.cache()
> or
> dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)
> val dropDF = spark.read.parquet(temppath)
> and then
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> Best,
> 
> On Sun, Jan 22, 2017 at 12:31 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Also, do you know why this happen? 
> 
>> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com 
>> <mailto:pavel.plotni...@team.wrike.com>> wrote:
>> 
> 
>> Hi Yang,
>> i have faced with the same problem on Mesos and to circumvent this issue i 
>> am usually increase partition number. On last step in your code you reduce 
>> number of partitions to 1, try to set bigger value, may be it solve this 
>> problem.
>> 
>> Cheers,
>> Pavel
>> 
>> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
>> <mailto:cybea...@gmail.com>> wrote:
>> Hi all,
>> 
>> I am running a spark application on YARN-client mode with 6 executors (each 
>> 4 cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
>> 2.1.0). I find that my executor memory keeps increasing until get killed by 
>> node manager; and give out the info that tells me to boost 
>> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
>> size of memory allocated off-heap. But I don’t know when and how the spark 
>> engine will use this part of memory. Also increase that part of memory not 
>> always solve my problem. sometimes works sometimes not. It trends to be 
>> useless when the input data is large.
>> 
>> FYI, my app’s logic is quite simple. It means to combine the small files 
>> generated in one single day (one directory one day) into a single one and 
>> write back to hdfs. Here is the core code:
>> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
>> ${ts.day}").coalesce(400)
>> val dropDF = 
>> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>> The source file may have hundreds to thousands level’s partition. And the 
>> total parquet file is around 1to 5 gigs. Also I find that in the step that 
>> shuffle reading data from different machines, The size of shuffle read is 
>> about 4 times larger than the input size, Which is wired or some principle I 
>> don’t know. 
>> 
>> Anyway, I have done some search myself for this problem. Some article said 
>> that it’s on the direct buffer memory (I don’t set myself). Some article 
>> said that people solve it with more frequent full GC. Also I find one people 
>> on SO with very similar situation: 
>> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>>  
>> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
>> This guy claimed th

Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Also, do you know why this happen? 
> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
> 



Re: physical memory usage keep increasing for spark app on Yarn

2017-01-22 Thread Yang Cao
Hi,
Thank you for your suggestion. As I know If I set to bigger number I won’t get 
the output number as one file, right? My task is design to combine all that 
small files in one day to one big parquet file. THX again.

Best,
> On 2017年1月20日, at 18:23, Pavel Plotnikov <pavel.plotni...@team.wrike.com> 
> wrote:
> 
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i am 
> usually increase partition number. On last step in your code you reduce 
> number of partitions to 1, try to set bigger value, may be it solve this 
> problem.
> 
> Cheers,
> Pavel
> 
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao <cybea...@gmail.com 
> <mailto:cybea...@gmail.com>> wrote:
> Hi all,
> 
> I am running a spark application on YARN-client mode with 6 executors (each 4 
> cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
> 2.1.0). I find that my executor memory keeps increasing until get killed by 
> node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
> size of memory allocated off-heap. But I don’t know when and how the spark 
> engine will use this part of memory. Also increase that part of memory not 
> always solve my problem. sometimes works sometimes not. It trends to be 
> useless when the input data is large.
> 
> FYI, my app’s logic is quite simple. It means to combine the small files 
> generated in one single day (one directory one day) into a single one and 
> write back to hdfs. Here is the core code:
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
> The source file may have hundreds to thousands level’s partition. And the 
> total parquet file is around 1to 5 gigs. Also I find that in the step that 
> shuffle reading data from different machines, The size of shuffle read is 
> about 4 times larger than the input size, Which is wired or some principle I 
> don’t know. 
> 
> Anyway, I have done some search myself for this problem. Some article said 
> that it’s on the direct buffer memory (I don’t set myself). Some article said 
> that people solve it with more frequent full GC. Also I find one people on SO 
> with very similar situation: 
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
>  
> <http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn>
> This guy claimed that it’s a bug with parquet but comment questioned him. 
> People in this mail list may also receive an email hours ago from blondowski 
> who described this problem while writing json: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none>
> 
> So it looks like to be common question for different output format. I hope 
> someone with experience about this problem could make an explanation about 
> this issue. Why this happen and what is a reliable way to solve this problem. 
> 
> Best,
> 
> 



physical memory usage keep increasing for spark app on Yarn

2017-01-20 Thread Yang Cao
Hi all,

I am running a spark application on YARN-client mode with 6 executors (each 4 
cores and executor memory = 6G and Overhead = 4G, spark version: 1.6.3 / 
2.1.0). I find that my executor memory keeps increasing until get killed by 
node manager; and give out the info that tells me to boost 
spark.yarn.excutor.memoryOverhead. I know that this param mainly control the 
size of memory allocated off-heap. But I don’t know when and how the spark 
engine will use this part of memory. Also increase that part of memory not 
always solve my problem. sometimes works sometimes not. It trends to be useless 
when the input data is large.

FYI, my app’s logic is quite simple. It means to combine the small files 
generated in one single day (one directory one day) into a single one and write 
back to hdfs. Here is the core code:
val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
${ts.day}").coalesce(400)
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
The source file may have hundreds to thousands level’s partition. And the total 
parquet file is around 1to 5 gigs. Also I find that in the step that shuffle 
reading data from different machines, The size of shuffle read is about 4 times 
larger than the input size, Which is wired or some principle I don’t know. 

Anyway, I have done some search myself for this problem. Some article said that 
it’s on the direct buffer memory (I don’t set myself). Some article said that 
people solve it with more frequent full GC. Also I find one people on SO with 
very similar situation: 
http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
This guy claimed that it’s a bug with parquet but comment questioned him. 
People in this mail list may also receive an email hours ago from blondowski 
who described this problem while writing json: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none

So it looks like to be common question for different output format. I hope 
someone with experience about this problem could make an explanation about this 
issue. Why this happen and what is a reliable way to solve this problem. 

Best,




filter push down on har file

2017-01-16 Thread Yang Cao
Hi,

My team just do a archive on last year’s parquet files. I wonder whether the 
filter push down optimization still work when I read data through 
“har:///path/to/data/“? THX.

Best,
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kryo On Spark 1.6.0

2017-01-10 Thread Yang Cao
If you don’t mind, could please share me with the scala solution? I tried to 
use kryo but seamed not work at all. I hope to get some practical example. THX
> On 2017年1月10日, at 19:10, Enrico DUrso  wrote:
> 
> Hi,
> 
> I am trying to use Kryo on Spark 1.6.0.
> I am able to register my own classes and it works, but when I set 
> “spark.kryo.registrationRequired “ to true, I get an error about a scala 
> class:
> “Class is not registered: scala.collection.mutable.WrappedArray$ofRef”.
> 
> Any of you has already solved this issue in Java? I found the code to solve 
> it in Scala, but unable to register this class in Java.
> 
> Cheers,
> 
> enrico
> 
> 
> CONFIDENTIALITY WARNING.
> This message and the information contained in or attached to it are private 
> and confidential and intended exclusively for the addressee. everis informs 
> to whom it may receive it in error that it contains privileged information 
> and its use, copy, reproduction or distribution is prohibited. If you are not 
> an intended recipient of this E-mail, please notify the sender, delete it and 
> do not read, act upon, print, disclose, copy, retain or redistribute any 
> portion of this E-mail.



Do you use spark 2.0 in work?

2016-10-31 Thread Yang Cao
Hi guys,

Just for personal interest. I wonder whether spark 2.0 a productive version? Is 
there any company use this version as its main version in daily work? THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.net.UnknownHostException

2016-08-02 Thread Yang Cao
actually, i just came into same problem. Whether you can share some code around 
the error, then I can figure it out whether I can help you. And the 
"s001.bigdata” is your name of name node?
> On 2016年8月2日, at 17:22, pseudo oduesp  wrote:
> 
> someone can help me please 
> 
> 2016-08-01 11:51 GMT+02:00 pseudo oduesp  >:
> hi 
> i get the following erreors when i try using pyspark 2.0 with ipython   on 
> yarn 
> somone can help me please .
> java.lang.IllegalArgumentException: java.net.UnknownHostException: 
> s001.bigdata.;s003.bigdata;s008bigdata.
> at 
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
> at 
> org.apache.hadoop.crypto.key.kms.KMSClientProvider.getDelegationTokenService(KMSClientProvider.java:823)
> at 
> org.apache.hadoop.crypto.key.kms.KMSClientProvider.addDelegationTokens(KMSClientProvider.java:779)
> at 
> org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension.addDelegationTokens(KeyProviderDelegationTokenExtension.java:86)
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2046)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$obtainTokensForNamenodes$1.apply(YarnSparkHadoopUtil.scala:133)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil$$anonfun$obtainTokensForNamenodes$1.apply(YarnSparkHadoopUtil.scala:130)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
> at 
> org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.obtainTokensForNamenodes(YarnSparkHadoopUtil.scala:130)
> at 
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:367)
> at 
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:834)
> at 
> org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:167)
> at 
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)
> at org.apache.spark.SparkContext.(SparkContext.scala:500)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:240)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:236)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: 
> java.net.UnknownHostException:s001.bigdata.;s003.bigdata;s008bigdata.
> 
> 
> thanks 
> 



create external table from partitioned avro file

2016-07-28 Thread Yang Cao
Hi,

I am using spark 1.6 and I hope to create a hive external table based on one 
partitioned avro file. Currently, I don’t find any build-in api to do this 
work. I tried the write.format().saveAsTable, with format 
com.databricks.spark.avro. it returned error can’t file Hive serde for this. 
Also, same problem with function createExternalTable(). Spark seems can 
recognize avro format. Need help for this task. Welcome any suggestion.

THX
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



get hdfs file path in spark

2016-07-25 Thread Yang Cao
Hi,
To be new here, I hope to get assistant from you guys. I wonder whether I have 
some elegant way to get some directory under some path. For example, I have a 
path like on hfs /a/b/c/d/e/f, and I am given a/b/c, is there any straight 
forward way to get the path /a/b/c/d/e . I think I can do it with the help of 
regex. But I still hope to find whether there is easier way that make my code 
cleaner. My evn: spark 1.6, language: Scala


Thx
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org