Hi All, I'm running Spark Streaming (Python) with Direct Kafka and I'm seeing that the memory usage will slowly go up and eventually kill the job in a few days.
Everything runs fine at first but after a few days the job started issuing *error: [Errno 104] Connection reset by peer , *followed by *java.lang.OutOfMemoryError: GC overhead limit exceeded *when I tried to access the web UI. I'm not using any fancy settings, pretty much just the default, and give each executor(4 cores) 14G of memory and 40G to the driver. I looked through the mailing list and around the web. There were a few streaming running out of memory issues but with no apparent solutions. If anyone have insights into this please let me know! Best, Augustus ------------------------------------------------------------------------------------- Detailed Logs below: The first error I see is this: 15/12/02 19:05:03 INFO scheduler.DAGScheduler: Job 270661 finished: call at >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206, took >> 0.560671 s > > 15/12/02 19:05:09 ERROR python.PythonRDD: Error while sending iterator > > java.net.SocketTimeoutException: Accept timed out > > at java.net.PlainSocketImpl.socketAccept(Native Method) > > at >> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) > > at java.net.ServerSocket.implAccept(ServerSocket.java:530) > > at java.net.ServerSocket.accept(ServerSocket.java:498) > > at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:613) > > Traceback (most recent call last): > > File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", >> line 62, in call > > r = self.func(t, *rdds) > > File "/root/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", >> line 159, in <lambda> > > func = lambda t, rdd: old_func(rdd) > > File "/root/spark-projects/click-flow/click-stream.py", line 141, in >> <lambda> > > keys.foreachRDD(lambda rdd: rdd.foreachPartition(lambda part: >> save_sets(part, KEY_SET_NAME, ITEMS_PER_KEY_LIMIT_KEYS))) > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 766, in >> foreachPartition > > self.mapPartitions(func).count() # Force evaluation > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in >> count > > return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in >> sum > > return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in >> fold > > vals = self.mapPartitions(func).collect() > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 774, in >> collect > > return list(_load_from_socket(port, self._jrdd_deserializer)) > > File "/root/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in >> _load_from_socket > > for item in serializer.load_stream(rf): > > File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >> 139, in load_stream > > yield self._read_with_length(stream) > > File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >> 156, in _read_with_length > > length = read_int(stream) > > File "/root/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >> 542, in read_int > > length = stream.read(4) > > File "/usr/lib64/python2.6/socket.py", line 383, in read > > data = self._sock.recv(left) > > error: [Errno 104] Connection reset by peer > > 15/12/02 19:05:09 INFO scheduler.JobScheduler: Finished job streaming job >> 1449082320000 ms.1 from job set of time 1449082320000 ms > > >> >> And then when I try to access the web UI this error is thrown, leading me to believe that it has something to do with memory being full: 15/12/02 19:43:34 WARN servlet.ServletHandler: Error for /jobs/ > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:64) > at java.lang.StringBuilder.<init>(StringBuilder.java:97) > at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:46) > at scala.collection.mutable.StringBuilder.<init>(StringBuilder.scala:51) > at scala.xml.Attribute$class.toString1(Attribute.scala:96) > at scala.xml.UnprefixedAttribute.toString1(UnprefixedAttribute.scala:16) > at scala.xml.MetaData.buildString(MetaData.scala:202) > at scala.xml.Utility$.serialize(Utility.scala:216) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.xml.Utility$.sequenceToXML(Utility.scala:256) > at scala.xml.Utility$.serialize(Utility.scala:227) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.xml.Utility$.sequenceToXML(Utility.scala:256) > at scala.xml.Utility$.serialize(Utility.scala:227) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.xml.Utility$.sequenceToXML(Utility.scala:256) > at scala.xml.Utility$.serialize(Utility.scala:227) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at scala.xml.Utility$$anonfun$sequenceToXML$2.apply(Utility.scala:256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.xml.Utility$.sequenceToXML(Utility.scala:256) > at scala.xml.Utility$.serialize(Utility.scala:227) -- [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus Hong* Data Analytics | Branch Metrics m 650-391-3369 | e augus...@branch.io