Re: Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Harry Brundage
Are you certain that's happening Jim? Why? What happens if you just do
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop InputFormat
for gzip and the RDD wrapper around it already has the streaming
behaviour you wish for. but I could be wrong. Also, are you in pyspark or
scala Spark?

On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll jimfcarr...@gmail.com wrote:

 Is there a way to get Spark to NOT reparition/shuffle/expand a
 sc.textFile(fileUri) when the URI is a gzipped file?

 Expanding a gzipped file should be thought of as a transformation and not
 an action (if the analogy is apt). There is no need to fully create and
 fill out an intermediate RDD with the expanded data when it can be done one
 row at a time.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark handling of a file://xxxx.gz Uri

2014-12-16 Thread Jim


Hi Harry,

Thanks for your response.

I'm working in scala. When I do a count call it expands the RDD in the 
count (since it's an action). You can see the call stack that results in 
the failure of the job here:


 ERROR DiskBlockObjectWriter - Uncaught exception while reverting 
partial writes to file 
/tmp/spark-local-20141216170458-964a/1d/temp_shuffle_4f46af09-5521-4fc6-adb1-c72839520560

java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
at 
org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at 
org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:263)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)

at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:718)
at 
org.apache.spark.serializer.JavaSerializationStream.flush(JavaSerializer.scala:51)
at 
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(BlockObjectWriter.scala:173)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:774)
at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$stop$2.apply(ExternalSorter.scala:773)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
org.apache.spark.util.collection.ExternalSorter.stop(ExternalSorter.scala:773)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.stop(SortShuffleWriter.scala:93)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:74)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Notice the task run (this is now doing a count) results in a Shuffle 
during which it writes the intermediate RDD to disk (and fails when the 
disk is full). This intermediate RDD/disk write is unnecessary.


I even implemented a Seq[String] in terms of streaming the file and 
called sc.parallelize(mySequence,1) and THIS results in a call to 
toArray on my sequence. Since this wont fit on disk it certainly wont 
fit in an array in memory.


Thanks for taking the time to respond.

Jim

On 12/16/2014 04:57 PM, Harry Brundage wrote:
Are you certain that's happening Jim? Why? What happens if you just do 
sc.textFile(fileUri).count() ? If I'm not mistaken the Hadoop 
InputFormat for gzip and the RDD wrapper around it already has the 
streaming behaviour you wish for. but I could be wrong. Also, are 
you in pyspark or scala Spark?


On Tue, Dec 16, 2014 at 1:22 PM, Jim Carroll jimfcarr...@gmail.com 
mailto:jimfcarr...@gmail.com wrote:


Is there a way to get Spark to NOT reparition/shuffle/expand a
sc.textFile(fileUri) when the URI is a gzipped file?

Expanding a gzipped file should be thought of as a
transformation and not
an action (if the analogy is apt). There is no need to fully
create and
fill out an intermediate RDD with the expanded data when it can be
done one
row at a time.




--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-handling-of-a-file--gz-Uri-tp20726.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org