Yeah, so the problem is that countByValue returns *all* values and their counts 
to your machine. If you just want the top 10, try this:

# do a distributed count using reduceByKey
counts = data.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)

# reverse the (key, count) pairs into (count, key) and then sort in descending 
order
sorted = counts.map(lambda (key, count): (count, key)).sortByKey(False)

# take the top 10 elements
top = sorted.take(10)

Matei

On Feb 24, 2014, at 1:48 PM, Chengi Liu <chengi.liu...@gmail.com> wrote:

> Hi,
>    Using pyspark for the first time on realistic dataset ( few hundred GB's) 
> but have been seeing a lot of errors on pyspark shell? This might be because 
> maybe I am not using pyspark correctly?
> 
> But here is what I was trying:
> extract_subs.take(2) 
> //returns [u'867430', u'867429']
> extract_subs_count = sorted(extract_subs.countByValue().items())
> To do the frequency count...
> Somewhere in between.. I see one error..
> 
> 14/02/24 13:43:44 INFO DAGScheduler: Failed to run countByValue at <stdin>:1
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 618, in countByValue
>     return self.mapPartitions(countPartition).reduce(mergeMaps)
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 499, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 463, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File 
> "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 
> 537, in __call__
>   File "/home/hadoop/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", 
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o163.collect.
> : org.apache.spark.SparkException: Job aborted: Exception while deserializing 
> and fetching task: java.lang.OutOfMemoryError: Java heap space
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at scala.Option.foreach(Option.scala:236)
>     at 
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> 
> And then it stucks:
> 14/02/24 13:43:50 INFO BlockManagerMasterActor$BlockManagerInfo: Removed 
> taskresult_1267 on node07:34461 in memory (size: 15.5 MB, free: 279.2 MB)
> 14/02/24 13:43:51 INFO TaskSetManager: Finished TID 1257 in 22267 ms on 
> node07 (progress: 64/66)
> 14/02/24 13:43:51 INFO BlockManagerMasterActor$BlockManagerInfo: Removed 
> taskresult_1257 on node07:34461 in memory (size: 15.7 MB, free: 294.9 MB)
> 
> And then when I press enter..
> on the command prompt:
> extract_subs_count 
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
> NameError: name 'extract_subs_count' is not defined
> 
> 
> 
> So.. what I am wondering is if pyspark ready for realistic datasets or is 
> that I am doing soemthing stupid.
> Thanks
> 
> 
> 
> 
> On Mon, Feb 24, 2014 at 1:22 PM, Chengi Liu <chengi.liu...@gmail.com> wrote:
> Its around 10 GB big?
> All I want  is to do a frequency count? And then get top 10 entries based on 
> count?
> How do i do this (again on pyspark(
> Thanks
> 
> 
> On Mon, Feb 24, 2014 at 1:19 PM, Matei Zaharia <matei.zaha...@gmail.com> 
> wrote:
> collect() means to bring all the data back to the master node, and there 
> might just be too much of it for that. How big is your file? If you can’t 
> bring it back to the master node try saveAsTextFile to write it out to a 
> filesystem (in parallel).
> 
> Matei
> 
> On Feb 24, 2014, at 1:08 PM, Chengi Liu <chengi.liu...@gmail.com> wrote:
> 
>> Hi,
>>   A newbie here. I am trying to do etl on spark. Few questions.
>> 
>> I have csv file with header.
>> 1) How do I parse this file (as it has a header..)
>> 2) I was trying to follow the tutorial here: 
>> http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html
>> 
>> 3) I am trying to do a frequency count.. 
>>      rows_key_value = rows.map(lambda x:(x[1],1)).reduceByKey(lambda 
>> x,y:x+y,1).collect()
>> 
>> 
>> After waiting for like few minutes I see this error:
>> java.lang.OutOfMemoryError: Java heap space
>>     at java.util.Arrays.copyOf(Arrays.java:2271)
>>     at 
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>>     at 
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:50)
>>     at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
>>     at 
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:744)
>> 14/02/24 13:06:45 INFO TaskSetManager: Starting task 13.0:0 as TID 331 on 
>> executor 2: node07 (PROCESS_LOCAL)
>> 14/02/24 13:06:45 INFO TaskSetManager: Serialized task 13.0:0 as 3809 bytes 
>> in 0 ms
>> 
>> 
>> How do i fix this?
>> Thanks
>> 
>> 
> 
> 
> 

Reply via email to