Re: Custom Log4j layout on YARN = ClassNotFoundException

2016-04-22 Thread andrew.rowson
Apologies, outlook for mac is ridiculous. Copy and paste the original below:

-

I’m running into a strange issue with trying to use a custom Log4j layout for 
Spark (1.6.1) on YARN (CDH). The layout is: 
https://github.com/michaeltandy/log4j-json

If I use a log4j.properties file (supplied with --files) with:

log4j.appender.consolejson=org.apache.log4j.ConsoleAppender
log4j.appender.consolejson.target=System.err
log4j.appender.consolejson.layout=uk.me.mjt.log4jjson.SimpleJsonLayout


And supply the log4j-json.1.0.jar with ‘--jars’ to spark-submit, the driver and 
executors throw an exception right at the start of the log file:

log4j:ERROR Could not instantiate class [uk.me.mjt.log4jjson.SimpleJsonLayout].
java.lang.ClassNotFoundException: uk.me.mjt.log4jjson.SimpleJsonLayout

However, a simple spark job that does something like:

sc.parallelize(List(1,2,3)).foreach(i => 
{Class.forName("uk.me.mjt.log4jjson.SimpleJsonLayout")})

Doesn’t throw an error. So the class is being loaded, but just not in time for 
Log4j to use it.

I've tried a few different options trying to get it to work (including it in 
the YARN application classpath, spark executor classpaths etc) and they all 
produce the same results. The only thing that seems to work is building a 
custom spark-assembly with the maven dependency included in core/pom.xml. This 
way, the layout is included in the spark assembly jar, and I get the JSON log 
output desired.

Is there a classloading issue on Log4j when using --jars? I can't imagine why 
it works with bundling in spark-assembly, but doesn't work with --jars.


From:  Ted Yu 
Date:  Friday, 22 April 2016 at 14:55
To:  Andrew Rowson 
Cc:  "user@spark.apache.org" 
Subject:  Re: Custom Log4j layout on YARN = ClassNotFoundException

There is not much in the body of email. 

Can you elaborate what issue you encountered ?

Thanks

On Fri, Apr 22, 2016 at 2:27 AM, Rowson, Andrew G. (TR Technology & Ops) 
 wrote:



This e-mail is for the sole use of the intended recipient and contains 
information that may be privileged and/or confidential. If you are not an 
intended recipient, please notify the sender by return e-mail and delete this 
e-mail and any attachments. Certain required legal entity disclosures can be 
accessed on our website.


-- Forwarded message --
From: "Rowson, Andrew G. (TR Technology & Ops)" 

To: "user@spark.apache.org" 
Cc: 
Date: Fri, 22 Apr 2016 10:27:53 +0100
Subject: Custom Log4j layout on YARN = ClassNotFoundException


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




smime.p7s
Description: S/MIME cryptographic signature


Re: Driver running out of memory - caused by many tasks?

2015-08-27 Thread andrew.rowson
Thanks for this tip.

I ran it in yarn-client mode with driver-memory = 4G and took a dump once the 
heap got close to 4G.

 num#instances  #bytes  class name
--
   1:   446169  3661137256  [J
   2:   2032795 222636720   [C
   3:   899257717200[B
   4:   2031653 48759672java.lang.String
   5:   920263  22086312scala.collection.immutable.$colon$colon
   6:   156233  20495936[Ljava.lang.Object;


So, somehow there’s 3.5G worth of Long arrays in there, each one apparently 
using about 8k of heap. Digging into the instances of these reveals that the 
vast majority of these are identical long[] of size 1024 with each element set 
to -1 (8,216 bytes).

I’m at a bit of a loss as to what this could be. Any ideas how I can understand 
what’s happening?

Andrew



From:  Ashish Rangole
Date:  Thursday, 27 August 2015 15:24
To:  Andrew Rowson
Cc:  user, "ewan.le...@realitymine.com"
Subject:  Re: Driver running out of memory - caused by many tasks?


I suggest taking a heap dump of driver process using jmap. Then open that dump 
in a tool like Visual VM to see which object(s) are taking up heap space. It is 
easy to do. We did this and found out that in our case it was the data 
structure that
 stores info about stages, jobs and tasks. There can be other reasons as well, 
of course.

On Aug 27, 2015 4:17 AM,  wrote:

I should have mentioned: yes I am using Kryo and have registered KeyClass and 
ValueClass.



I guess it’s not clear to me what is actually taking up space on the driver 
heap - I can’t see how it can be data with the code that I have.

On 27/08/2015 12:09, "Ewan Leith"  wrote:

>Are you using the Kryo serializer? If not, have a look at it, it can save a 
>lot of memory during shuffles
>
>https://spark.apache.org/docs/latest/tuning.html
>
>I did a similar task and had various issues with the volume of data being 
>parsed in one go, but that helped a lot. It looks like the main difference 
>from what you're doing to me is that my input classes were just a string and a 
>byte array, which I then processed
 once it was read into the RDD, maybe your classes are memory heavy?
>
>
>Thanks,
>Ewan
>
>-Original Message-
>From: andrew.row...@thomsonreuters.com 
>[mailto:andrew.row...@thomsonreuters.com]
>Sent: 27 August 2015 11:53
>To: user@spark.apache.org
>Subject: Driver running out of memory - caused by many tasks?
>
>I have a spark v.1.4.1 on YARN job where the first stage has ~149,000 tasks 
>(it’s reading a few TB of data). The job itself is fairly simple - it’s just 
>getting a list of distinct values:
>
>val days = spark
>  .sequenceFile(inputDir, classOf[KeyClass], classOf[ValueClass])
>  .sample(withReplacement = false, fraction = 0.01)
>  .map(row => row._1.getTimestamp.toString("-MM-dd"))
>  .distinct()
>  .collect()
>
>The cardinality of the ‘day’ is quite small - there’s only a handful. However, 
>I’m frequently running into OutOfMemory issues on the driver. I’ve had it fail 
>with 24GB RAM, and am currently nudging it upwards to find out where it works. 
>The ratio between input
 and shuffle write in the distinct stage is about 3TB:7MB. On a smaller 
dataset, it works without issue on a smaller (4GB) heap. In YARN cluster mode, 
I get a failure message similar to:
>
>Container 
> [pid=36844,containerID=container_e15_1438040390147_4982_01_01] is running 
> beyond physical memory limits. Current usage: 27.6 GB of 27 GB physical 
> memory used; 29.5 GB of 56.7 GB virtual memory used. Killing container.
>
>
>Is the driver running out of memory simply due to the number of tasks, or is 
>there something about the job program that’s causing it to put a lot of data 
>into the driver heap and go oom? If the former, is there any general guidance 
>about the amount of memory
 to give to the driver as a function of how many tasks there are?
>
>Andrew

smime.p7s
Description: S/MIME cryptographic signature


Re: Driver running out of memory - caused by many tasks?

2015-08-27 Thread andrew.rowson
I should have mentioned: yes I am using Kryo and have registered KeyClass and 
ValueClass.



I guess it’s not clear to me what is actually taking up space on the driver 
heap - I can’t see how it can be data with the code that I have.

On 27/08/2015 12:09, "Ewan Leith"  wrote:

>Are you using the Kryo serializer? If not, have a look at it, it can save a 
>lot of memory during shuffles
>
>https://spark.apache.org/docs/latest/tuning.html
>
>I did a similar task and had various issues with the volume of data being 
>parsed in one go, but that helped a lot. It looks like the main difference 
>from what you're doing to me is that my input classes were just a string and a 
>byte array, which I then processed once it was read into the RDD, maybe your 
>classes are memory heavy?
>
>
>Thanks,
>Ewan
>
>-Original Message-
>From: andrew.row...@thomsonreuters.com 
>[mailto:andrew.row...@thomsonreuters.com] 
>Sent: 27 August 2015 11:53
>To: user@spark.apache.org
>Subject: Driver running out of memory - caused by many tasks?
>
>I have a spark v.1.4.1 on YARN job where the first stage has ~149,000 tasks 
>(it’s reading a few TB of data). The job itself is fairly simple - it’s just 
>getting a list of distinct values:
>
>val days = spark
>  .sequenceFile(inputDir, classOf[KeyClass], classOf[ValueClass])
>  .sample(withReplacement = false, fraction = 0.01)
>  .map(row => row._1.getTimestamp.toString("-MM-dd"))
>  .distinct()
>  .collect()
>
>The cardinality of the ‘day’ is quite small - there’s only a handful. However, 
>I’m frequently running into OutOfMemory issues on the driver. I’ve had it fail 
>with 24GB RAM, and am currently nudging it upwards to find out where it works. 
>The ratio between input and shuffle write in the distinct stage is about 
>3TB:7MB. On a smaller dataset, it works without issue on a smaller (4GB) heap. 
>In YARN cluster mode, I get a failure message similar to:
>
>Container 
> [pid=36844,containerID=container_e15_1438040390147_4982_01_01] is running 
> beyond physical memory limits. Current usage: 27.6 GB of 27 GB physical 
> memory used; 29.5 GB of 56.7 GB virtual memory used. Killing container.
>
>
>Is the driver running out of memory simply due to the number of tasks, or is 
>there something about the job program that’s causing it to put a lot of data 
>into the driver heap and go oom? If the former, is there any general guidance 
>about the amount of memory to give to the driver as a function of how many 
>tasks there are?
>
>Andrew

smime.p7s
Description: S/MIME cryptographic signature


Driver running out of memory - caused by many tasks?

2015-08-27 Thread andrew.rowson
I have a spark v.1.4.1 on YARN job where the first stage has ~149,000 tasks 
(it’s reading a few TB of data). The job itself is fairly simple - it’s just 
getting a list of distinct values:

val days = spark
  .sequenceFile(inputDir, classOf[KeyClass], classOf[ValueClass])
  .sample(withReplacement = false, fraction = 0.01)
  .map(row => row._1.getTimestamp.toString("-MM-dd"))
  .distinct()
  .collect()

The cardinality of the ‘day’ is quite small - there’s only a handful. However, 
I’m frequently running into OutOfMemory issues on the driver. I’ve had it fail 
with 24GB RAM, and am currently nudging it upwards to find out where it works. 
The ratio between input and shuffle write in the distinct stage is about 
3TB:7MB. On a smaller dataset, it works without issue on a smaller (4GB) heap. 
In YARN cluster mode, I get a failure message similar to:

Container 
[pid=36844,containerID=container_e15_1438040390147_4982_01_01] is running 
beyond physical memory limits. Current usage: 27.6 GB of 27 GB physical memory 
used; 29.5 GB of 56.7 GB virtual memory used. Killing container.


Is the driver running out of memory simply due to the number of tasks, or is 
there something about the job program that’s causing it to put a lot of data 
into the driver heap and go oom? If the former, is there any general guidance 
about the amount of memory to give to the driver as a function of how many 
tasks there are?

Andrew

smime.p7s
Description: S/MIME cryptographic signature


Duplicate key when sorting BytesWritable with Kryo?

2015-01-30 Thread andrew.rowson
I've found a strange issue when trying to sort a lot of data in HDFS using
spark 1.2.0 (CDH5.3.0). My data is in sequencefiles and the key is a class
that derives from BytesWritable (the value is also a BytesWritable). I'm
using a custom KryoSerializer to serialize the underlying byte array
(basically write the length and the byte array).

My spark job looks like this:

spark.sequenceFile(inputPath, classOf[CustomKey],
classOf[BytesWritable]).sortByKey().map(t =>
t._1).saveAsTextFile(outputPath)

CustomKey extends BytesWritable, adds a toString method and some other
helper methods that extract and convert parts of the underlying byte[].

This should simply output a series of textfiles which contain the sorted
list of keys. The problem is that under certain circumstances I get many
duplicate keys. The number of records output is correct, but it appears that
large chunks of the output are simply copies of the last record in that
chunk. E.g instead of [1,2,3,4,5,6,7,8,9] I'll see [9,9,9,9,9,9,9,9,9]. 

This appears to happen only above certain input data volumes, and it appears
to be when shuffle spills. For a job where shuffle spill for memory and disk
= 0B, the data is correct. If there is any spill, I see the duplicate
behaviour. Oddly, the shuffle write is much smaller when there's a spill.
E.g. the non spill job has 18.8 GB of input and 14.9GB of shuffle write,
whereas the spill job has 24.2 GB of input, and only 4.9GB of shuffle write.
I'm guessing some sort of compression is happening on duplicate identical
values?

Oddly, I can fix this issue if I adjust my scala code to insert a map step
before the call to sortByKey():

.map(t => (new CustomKey(t._1),t._2))

This constructor is just:

public CustomKey(CustomKey left) { this.set(left); }

Why does this work? I've no idea.

The spark job is running in yarn-client mode with all the default
configuration values set. Using the external shuffle service and disabling
spill compression makes no difference.

Is this a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-key-when-sorting-BytesWritable-with-Kryo-tp21447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org