Hello,
I am working on a machine learning project, currently using
spark-1.4.1-bin-hadoop2.6 in local mode on a laptop (Ubuntu 14.04 OS running
on a Dell laptop with [email protected] GHz * 4 cores, 15.6 GB RAM). I also
mention working in Python from an IPython notebook.
I face the following problem: when working with a Dataframe created from a
CSV file (2.7 GB) with schema inferred (1900 features), the time it takes
for Spark to count the 145231 rows is 3:30 minutes using 4 cores. Longer
times are recorder for computing one feature's statistics, for example:
--------------------------------------------START AT: 2015-09-21
08:56:41.136947
+-------+------------------+
|summary| VAR_1933|
+-------+------------------+
| count| 145231|
| mean| 8849.839111484464|
| stddev|3175.7863998269395|
| min| 0|
| max| 9999|
+-------+------------------+
--------------------------------------------FINISH AT: 2015-09-21
09:02:49.452260
So, my first question would be what configuration parameters to set in order
to improve this performance?
I tried some explicit configuration in the IPython notebook, but specifying
resources explicitly when creating the Spark configuration resulted in worse
performance; I mean :
config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path)
worked twice faster than:
config =
SparkConf().setAppName("cleankaggle").setMaster("local[4]").set("spark.jars",
jar_path).set("spark.driver.memory", "2g").set("spark.python.worker.memory
", "3g")
****************************
Secondly, when I do the one hot encoding (I tried two different ways of
keeping results) I don't arrive at showing the head(1) of the resulted
dataframe. We have the function :
def OHE_transform(categ_feat, df_old):
outputcolname = categ_feat + "_ohe_index"
outputcolvect = categ_feat + "_ohe_vector"
stringIndexer = StringIndexer(inputCol=categ_feat,
outputCol=outputcolname)
indexed = stringIndexer.fit(df_old).transform(df_old)
encoder = OneHotEncoder(inputCol=outputcolname, outputCol=outputcolvect)
encoded = encoder.transform(indexed)
return encoded
The two manners for keeping results are depicted below:
1)
result = OHE_transform(list_top_feat[0], df_top_categ)
for item in list_top_feat[1:]:
result = OHE_transform(item, result)
result.head(1)
2)
df_result = OHE_transform("VAR_A", df_top_categ)
df_result_1 = OHE_transform("VAR_B", df_result)
df_result_2 = OHE_transform("VAR_C", df_result_1)
...
df_result_12 = OHE_transform("VAR_X", df_result_11)
df_result_12.head(1)
In the first approach, at the third iteration (in the for loop), when it was
supposed to print the head(1), the IPython notebook remained in the state
"Kernel busy" for several hours and then I interrupted the kernel.
The second approach managed to go through all transformations (please note
that here I eliminated the intermediary prints of the head(1)), but it gave
an "out of memory" error at the only (final result) head(1), that I paste
below :
===============================================
df_result_12.head(1)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-29-e952d1766630> in <module>()
----> 1 df_result_12.head(1)
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
head(self, n)
649 rs = self.head(1)
650 return rs[0] if rs else None
--> 651 return self.take(n)
652
653 @ignore_unicode_prefix
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
take(self, num)
305 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
306 """
--> 307 return self.limit(num).collect()
308
309 @ignore_unicode_prefix
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc in
collect(self)
279 """
280 with SCCallSiteSync(self._sc) as css:
--> 281 port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
282 rs = list(_load_from_socket(port,
BatchedSerializer(PickleSerializer())))
283 cls = _create_cls(self.schema)
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
in stage 35.0 failed 1 times, most recent failure: Lost task 3.0 in stage
35.0 (TID 253, localhost): java.lang.OutOfMemoryError: GC overhead limit
exceeded
at java.lang.StringBuilder.toString(StringBuilder.java:405)
at
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3075)
at
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2871)
at java.io.ObjectInputStream.readString(ObjectInputStream.java:1638)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
=================================================================
In the second approach, continuing after this error resulted in :
Traceback (most recent call last):
File "/usr/lib/python2.7/SocketServer.py", line 295, in
_handle_request_noblock
self.process_request(request, client_address)
File "/usr/lib/python2.7/SocketServer.py", line 321, in process_request
self.finish_request(request, client_address)
File "/usr/lib/python2.7/SocketServer.py", line 334, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python2.7/SocketServer.py", line 649, in __init__
self.handle()
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/accumulators.py",
line 235, in handle
num_updates = read_int(self.rfile)
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/pyspark/serializers.py",
line 544, in read_int
raise EOFError
EOFError
ERROR:py4j.java_gateway:Error while sending or receiving.
Traceback (most recent call last):
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 479, in send_command
raise Py4JError("Answer from Java side is empty")
Py4JError: Answer from Java side is empty
ERROR:py4j.java_gateway:An error occurred while trying to connect to the
Java server
Traceback (most recent call last):
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 425, in start
self.socket.connect((self.address, self.port))
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the
Java server
Traceback (most recent call last):
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 425, in start
self.socket.connect((self.address, self.port))
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the
Java server
Traceback (most recent call last):
File
"/home/camelia/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 425, in start
self.socket.connect((self.address, self.port))
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
================================================
So, once again it seems that I need to change some configuration parameters
to prevent from such out of memory errors.
Thank you very much in advance.
Camelia
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Need-for-advice-performance-improvement-and-out-of-memory-resolution-tp24886.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]