RE: SequenceFile and object reuse

2015-11-19 Thread andrew.rowson
As I understand it, it's down to how Hadoop FileInputFormats work, and
questions of mutability. If you were to read a file from Hadoop via an
InputFormat with a simple Java program, the InputFormat's RecordReader
creates a single, mutable instance of the Writable key class and a single,
mutable instance of the Writable value. When you loop through the records,
the RecordReader reuses those Writable instances by deserializing the
underlying bytes from the file into the instances 1 record at a time. It's
up to the application to then copy whatever's needed out of those Writable
instances into something else if it wants to do something with them.

 

It's exactly the same when using Spark as the application. When you create
an RDD of Writable objects by calling .sequenceFile, the RDD contains many
identical references to the exact same object instance. Therefore, when
Spark does a sort, cache or shuffle, (I believe) it optimizes because it
assumes that objects are immutable. Therefore, the map step is necessary,
because it creates a distinct, immutable copy of each record.

 

This is just an issue with the Hadoop InputFormat class. If you can write a
way of reading files from HDFS that don't use Hadoop's classes (though I'm
not sure why you would, a simple map is far easier), then the map would
potentially be unnecessary.

 

Andrew

 

From: jeff saremi [mailto:jeffsar...@hotmail.com] 
Sent: 19 November 2015 05:35
To: Jeff Zhang 
Cc: dev@spark.apache.org
Subject: RE: SequenceFile and object reuse

 

You're not seeing the issue because you perform one additional "map". 

map{case (k,v) => (k.get(), v.toString)}

Instead of being able to use the read Text you had to create a tuple
(single) out of the string of the text.

That is exactly why I asked this question.

Why do we have t do this additional processing? What is the rationale behind
it?
Is there other ways of reading a hadoop file (or any other file) that would
not incur this additional step?

thanks

 

 

  _  

Date: Thu, 19 Nov 2015 13:26:31 +0800
Subject: Re: FW: SequenceFile and object reuse
From: zjf...@gmail.com  
To: jeffsar...@hotmail.com  
CC: dev@spark.apache.org  

Would this be an issue on the raw data ? I use the following simple code,
and don't hit the issue you mentioned. Or it would be better to share your
code. 

 

val rdd =sc.sequenceFile("/Users/hadoop/Temp/Seq", classOf[IntWritable],
classOf[Text])
rdd.map{case (k,v) => (k.get(), v.toString)}.collect() foreach println

 

On Thu, Nov 19, 2015 at 12:04 PM, jeff saremi  > wrote:

I sent this to the user forum. I got no responses. Could someone here please
help? thanks
jeff

 


  _  


From: jeffsar...@hotmail.com  
To: u...@spark.apache.org  
Subject: SequenceFile and object reuse
Date: Fri, 13 Nov 2015 13:29:58 -0500

 

So we tried reading a sequencefile in Spark and realized that all our
records have ended up becoming the same.
THen one of us found this:

Note: Because Hadoop's RecordReader class re-uses the same Writable object
for each record, directly caching the returned RDD or directly passing it to
an aggregation or shuffle operation will create many references to the same
object. If you plan to directly cache, sort, or aggregate Hadoop writable
objects, you should first copy them using a map function.

Is there anyone that can shed some light on this bizzare behavior and the
decisions behind it?
And I also would like to know if anyone's able to read a binary file and not
to incur the additional map() as suggested by the above? What format did you
use?

thanks

Jeff





 

-- 

Best Regards

Jeff Zhang



smime.p7s
Description: S/MIME cryptographic signature


EOFException on History server reading in progress lz4

2015-09-03 Thread andrew.rowson
I'm trying to solve a problem of the history server spamming my logs with
EOFExceptions when it tries to read a history file from HDFS that is both
lz4 compressed and incomplete. The actual exception is:

java.io.EOFException: Stream ended prematurely
at
net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:218)
at
net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:192)
at
net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:117)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:67
)
at
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:
55)
at
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$hi
story$FsHistoryProvider$$replay(FsHistoryProvider.scala:443)
at
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistor
yProvider.scala:278)
at
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$10.apply(FsHistor
yProvider.scala:275)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.sc
ala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.sc
ala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$hi
story$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:275
)
at
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$1$$a
non$2.run(FsHistoryProvider.scala:209)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)
at java.lang.Thread.run(Thread.java:745)

The bit I'm struggling with is handling this in ReplayListenerBus.scala - I
tried adding the following to the try/catch:

case eof: java.io.EOFException =>
logWarning(s"EOFException (probably due to incomplete lz4) at
$sourceName", eof)

but this never seems to get triggered - it still dumps the whole exception
out to the log.

I feel like there's something basic I'm missing for the exception not to be
caught by the try/catch in ReplayListenerBus. Can anyone point me in the
right direction?

Thanks,

Andrew


smime.p7s
Description: S/MIME cryptographic signature


RE: Spark builds: allow user override of project version at buildtime

2015-08-26 Thread andrew.rowson
So, I actually tried this, and it built without problems, but publishing the 
artifacts to artifactory ended up with some strangeness in the child poms, 
where the property wasn’t resolved. This leads to issues pulling them into 
other projects of: “Could not find 
org.apache.spark:spark-parent_2.10:${spark.version}.”

There's conflicting information out on the web about whether this should or 
shouldn't work, and whether it is or isn't a good idea. Broad consensus is that 
this is actually a bit of a hack around Maven, so it's probably not something 
we should do.

I'll explore whether sbt is more flexible and does what's needed. 

Andrew

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: 26 August 2015 03:12
To: Marcelo Vanzin van...@cloudera.com
Cc: Rowson, Andrew G. (FinancialRisk) andrew.row...@thomsonreuters.com; 
dev@spark.apache.org
Subject: Re: Spark builds: allow user override of project version at buildtime

This isn't really answering the question, but for what it is worth, I manage 
several different branches of Spark and publish custom named versions regularly 
to an internal repository, and this is *much* easier with SBT than with maven.  
You can actually link the Spark SBT build into an external SBT build and write 
commands that cross publish as needed.

For your case something as simple as build/sbt set version in Global := 
'1.4.1-custom-string' publish might do the trick.

On Tue, Aug 25, 2015 at 10:09 AM, Marcelo Vanzin van...@cloudera.com wrote:
On Tue, Aug 25, 2015 at 2:17 AM,  andrew.row...@thomsonreuters.com wrote:
 Then, if I wanted to do a build against a specific profile, I could also
 pass in a -Dspark.version=1.4.1-custom-string and have the output artifacts
 correctly named. The default behaviour should be the same. Child pom files
 would need to reference ${spark.version} in their parent section I think.

 Any objections to this?

Have you tried it? My understanding is that no project does that
because it doesn't work. To resolve properties you need to read the
parent pom(s), and if there's a variable reference there, well, you
can't do it. Chicken  egg.

--
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



smime.p7s
Description: S/MIME cryptographic signature


Spark builds: allow user override of project version at buildtime

2015-08-25 Thread andrew.rowson
I've got an interesting challenge in building Spark. For various reasons we
do a few different builds of spark, typically with a few different profile
options (e.g. against different versions of Hadoop, some with/without Hive
etc.). We mirror the spark repo internally and have a buildserver that
builds and publishes different Spark versions to an artifactory server. The
problem is that the output of each build is published with the version that
is in the pom.xml file - a build of Spark @tags/v1.4.1 always comes out with
an artefact version of '1.4.1'. However, because we may have three different
Spark builds for 1.4.1, it'd be useful to be able to override this version
at build time, so that we can publish 1.4.1, 1.4.1-cdh5.3.3 and maybe
1.4.1-cdh5.3.3-hive as separate artifacts. 

My understanding of maven is that the /project/version value in the pom.xml
isn't overridable. At the moment, I've hacked around this by having a
pre-build task that rewrites the various pom files and adjust the version to
a string that's correct for that particular build. 

Would it be useful to instead populate the version from a maven property,
which could then be overridable on the CLI? Something like:

project
version${spark.version}/version
properties
spark.version1.4.1/version
/properties
/project

Then, if I wanted to do a build against a specific profile, I could also
pass in a -Dspark.version=1.4.1-custom-string and have the output artifacts
correctly named. The default behaviour should be the same. Child pom files
would need to reference ${spark.version} in their parent section I think.

Any objections to this?

Andrew


smime.p7s
Description: S/MIME cryptographic signature