Hi All,

I'm running Spark Streaming (Python) with Direct Kafka and I'm seeing that
the memory usage will slowly go up and eventually kill the job in a few
days.

Everything runs fine at first but after a few days the job started issuing
*error: [Errno 104] Connection reset by peer ,  *followed by
*java.lang.OutOfMemoryError: GC overhead limit exceeded *when I tried to
access the web UI.

I'm not using any fancy settings, pretty much just the default, and give
each executor(4 cores) 14G of memory and 40G to the driver.

I looked through the mailing list and around the web. There were a few
streaming running out of memory issues but with no apparent solutions. If
anyone have insights into this please let me know!

Best,
Augustus

-------------------------------------------------------------------------------------

Detailed Logs below:

The first error I see is this:

15/12/02 19:05:03 INFO scheduler.DAGScheduler: Job 270661 finished: call at
>> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206, took
>> 0.560671 s
>
> 15/12/02 19:05:09 ERROR python.PythonRDD: Error while sending iterator
>
> java.net.SocketTimeoutException: Accept timed out
>
>   at java.net.PlainSocketImpl.socketAccept(Native Method)
>
>   at
>> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
>
>   at java.net.ServerSocket.implAccept(ServerSocket.java:530)
>
>   at java.net.ServerSocket.accept(ServerSocket.java:498)
>
>   at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:613)
>
> Traceback (most recent call last):
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
>> line 62, in call
>
>     r = self.func(t, *rdds)
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py",
>> line 159, in <lambda>
>
>     func = lambda t, rdd: old_func(rdd)
>
>   File "/root/spark-projects/click-flow/click-stream.py", line 141, in
>> <lambda>
>
>     keys.foreachRDD(lambda rdd: rdd.foreachPartition(lambda part:
>> save_sets(part, KEY_SET_NAME, ITEMS_PER_KEY_LIMIT_KEYS)))
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 766, in
>> foreachPartition
>
>     self.mapPartitions(func).count()  # Force evaluation
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in
>> count
>
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in
>> sum
>
>     return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in
>> fold
>
>     vals = self.mapPartitions(func).collect()
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 774, in
>> collect
>
>     return list(_load_from_socket(port, self._jrdd_deserializer))
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in
>> _load_from_socket
>
>     for item in serializer.load_stream(rf):
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 139, in load_stream
>
>     yield self._read_with_length(stream)
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 156, in _read_with_length
>
>     length = read_int(stream)
>
>   File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 542, in read_int
>
>     length = stream.read(4)
>
>   File "/usr/lib64/python2.6/socket.py", line 383, in read
>
>     data = self._sock.recv(left)
>
> error: [Errno 104] Connection reset by peer
>
> 15/12/02 19:05:09 INFO scheduler.JobScheduler: Finished job streaming job
>> 1449082320000 ms.1 from job set of time 1449082320000 ms
>
>
>>
>>
And then when I try to access the web UI this error is thrown, leading me
to believe that it has something to do with memory being full:

15/12/02 19:43:34 WARN servlet.ServletHandler: Error for /jobs/
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64)
>   at java.lang.StringBuilder.<init>(StringBuilder.java:97)
>   at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:46)
>   at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:51)
>   at scala.xml.Attribute$class.toString1(Attribute.scala:96)
>   at scala.xml.UnprefixedAttribute.toString1(UnprefixedAttribute.scala:16)
>   at scala.xml.MetaData.buildString(MetaData.scala:202)
>   at scala.xml.Utility$.serialize(Utility.scala:216)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>   at scala.xml.Utility$.serialize(Utility.scala:227)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>   at scala.xml.Utility$.serialize(Utility.scala:227)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>   at scala.xml.Utility$.serialize(Utility.scala:227)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256)
>   at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.xml.Utility$.sequenceToXML(Utility.scala:256)
>   at scala.xml.Utility$.serialize(Utility.scala:227)





-- 
[image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
Hong*
 Data Analytics | Branch Metrics
 m 650-391-3369 | e augus...@branch.io

Reply via email to