Hi

I am working on a spark POC. I created a ec2 cluster on AWS using
spark-1.5.1-bin-hadoop2.6/ec2/spark-ec2

Bellow is a simple python program. I am running using IPython notebook. The
notebook server is running on my spark master. If I run my program more than
1 once using my large data set, I get the GC outOfMemory error. I run it
each time by ³re running the notebook cell². I can run my smaller set a
couple of times with out problems.

I launch using

pyspark --master $MASTER_URL --total-executor-cores 2


Any idea how I can debug this?

Kind regards

Andy

Using the master console I see
* there is only one app run (this is what I expect)
* There are 2 works each on a different slave (this is what I expect)
* Each worker is using 1 core (this is what I expect)
* Each worker memory usage is using 6154 (seems resonable)

* Alive Workers: 3
* Cores in use: 6 Total, 2 Used
* Memory in use: 18.8 GB Total, 12.0 GB Used
* Applications: 1 Running, 5 Completed
* Drivers: 0 Running, 0 Completed
* Status: ALIVE

The data file I am working with is small

I collected this data using spark streaming twitter utilities. All I do is
capture tweets, convert them to JSON and store as strings to hdfs

$ hadoop fs -count  hdfs:///largeSample hdfs:///smallSample

       13226       240098         3839228100

           1       228156           39689877


My python code. I am using python 3.4.

import json
import datetime

startTime = datetime.datetime.now()

#dataURL = "hdfs:///largeSample"
dataURL = "hdfs:///smallSample"
tweetStrings = sc.textFile(dataURL)

t2 = tweetStrings.take(2)
print (t2[1])
print("\n\nelapsed time:%s" % (datetime.datetime.now() - startTime))

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-2e23bfb203a9> in <module>()
      8 tweetStrings = sc.textFile(dataURL)
      9 
---> 10 t2 = tweetStrings.take(2)
     11 print (t2[1])
     12 print("\n\nelapsed time:%s" % (datetime.datetime.now() - startTime))

/root/spark/python/pyspark/rdd.py in take(self, num)
   1267         """
   1268         items = []
-> 1269         totalParts = self.getNumPartitions()
   1270         partsScanned = 0
   1271 

/root/spark/python/pyspark/rdd.py in getNumPartitions(self)
    354         2
    355         """
--> 356         return self._jrdd.partitions().size()
    357 
    358     def filter(self, f):

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in
__call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/root/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in
get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o65.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.String.substring(String.java:1969)
        at java.net.URI$Parser.substring(URI.java:2869)
        at java.net.URI$Parser.parseHierarchical(URI.java:3106)
        at java.net.URI$Parser.parse(URI.java:3053)
        at java.net.URI.<init>(URI.java:746)
        at org.apache.hadoop.fs.Path.initialize(Path.java:145)
        at org.apache.hadoop.fs.Path.<init>(Path.java:71)
        at org.apache.hadoop.fs.Path.<init>(Path.java:50)
        at 
org.apache.hadoop.hdfs.protocol.HdfsFileStatus.getFullPath(HdfsFileStatus.ja
va:215)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.makeQualified(DistributedFileSy
stem.java:293)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSyste
m.java:352)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:862)
        at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:887)
        at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:185
)
        at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:3
5)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at 
org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:65)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:4
7)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)



Reply via email to