Yeah, so the problem is that countByValue returns *all* values and their counts to your machine. If you just want the top 10, try this:
# do a distributed count using reduceByKey counts = data.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b) # reverse the (key, count) pairs into (count, key) and then sort in descending order sorted = counts.map(lambda (key, count): (count, key)).sortByKey(False) # take the top 10 elements top = sorted.take(10) Matei On Feb 24, 2014, at 1:48 PM, Chengi Liu <chengi.liu...@gmail.com> wrote: > Hi, > Using pyspark for the first time on realistic dataset ( few hundred GB's) > but have been seeing a lot of errors on pyspark shell? This might be because > maybe I am not using pyspark correctly? > > But here is what I was trying: > extract_subs.take(2) > //returns [u'867430', u'867429'] > extract_subs_count = sorted(extract_subs.countByValue().items()) > To do the frequency count... > Somewhere in between.. I see one error.. > > 14/02/24 13:43:44 INFO DAGScheduler: Failed to run countByValue at <stdin>:1 > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/home/hadoop/spark/python/pyspark/rdd.py", line 618, in countByValue > return self.mapPartitions(countPartition).reduce(mergeMaps) > File "/home/hadoop/spark/python/pyspark/rdd.py", line 499, in reduce > vals = self.mapPartitions(func).collect() > File "/home/hadoop/spark/python/pyspark/rdd.py", line 463, in collect > bytesInJava = self._jrdd.collect().iterator() > File > "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line > 537, in __call__ > File "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o163.collect. > : org.apache.spark.SparkException: Job aborted: Exception while deserializing > and fetching task: java.lang.OutOfMemoryError: Java heap space > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > And then it stucks: > 14/02/24 13:43:50 INFO BlockManagerMasterActor$BlockManagerInfo: Removed > taskresult_1267 on node07:34461 in memory (size: 15.5 MB, free: 279.2 MB) > 14/02/24 13:43:51 INFO TaskSetManager: Finished TID 1257 in 22267 ms on > node07 (progress: 64/66) > 14/02/24 13:43:51 INFO BlockManagerMasterActor$BlockManagerInfo: Removed > taskresult_1257 on node07:34461 in memory (size: 15.7 MB, free: 294.9 MB) > > And then when I press enter.. > on the command prompt: > extract_subs_count > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > NameError: name 'extract_subs_count' is not defined > > > > So.. what I am wondering is if pyspark ready for realistic datasets or is > that I am doing soemthing stupid. > Thanks > > > > > On Mon, Feb 24, 2014 at 1:22 PM, Chengi Liu <chengi.liu...@gmail.com> wrote: > Its around 10 GB big? > All I want is to do a frequency count? And then get top 10 entries based on > count? > How do i do this (again on pyspark( > Thanks > > > On Mon, Feb 24, 2014 at 1:19 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: > collect() means to bring all the data back to the master node, and there > might just be too much of it for that. How big is your file? If you can’t > bring it back to the master node try saveAsTextFile to write it out to a > filesystem (in parallel). > > Matei > > On Feb 24, 2014, at 1:08 PM, Chengi Liu <chengi.liu...@gmail.com> wrote: > >> Hi, >> A newbie here. I am trying to do etl on spark. Few questions. >> >> I have csv file with header. >> 1) How do I parse this file (as it has a header..) >> 2) I was trying to follow the tutorial here: >> http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html >> >> 3) I am trying to do a frequency count.. >> rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda >> x,y:x+y,1).collect() >> >> >> After waiting for like few minutes I see this error: >> java.lang.OutOfMemoryError: Java heap space >> at java.util.Arrays.copyOf(Arrays.java:2271) >> at >> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50) >> at >> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223) >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:744) >> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on >> executor 2: node07 (PROCESS_LOCAL) >> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes >> in 0 ms >> >> >> How do i fix this? >> Thanks >> >> > > >