Thanks, that makes sense.
So it must be that this queue - which is kept because of the UDF - is the
one running out of memory, because without the UDF field there is no out of
memory error and the UDF fields is pretty small, unlikely that it would
take us above the memory limit.

In either case, thanks for your help, I think I understand it now how the
UDFs and the fields together with the number of rows can result our out of
memory scenario.

On Tue, Aug 9, 2016 at 5:06 PM, Davies Liu <dav...@databricks.com> wrote:

> When you have a Python UDF, only the input to UDF are passed into
> Python process,
> but all other fields that are used together with the result of UDF are
> kept in a queue
> then join with the result from Python. The length of this queue is depend
> on the
> number of rows is under processing by Python (or in the buffer of
> Python process).
> The amount of memory required also depend on how many fields are used in
> the
> results.
>
> On Tue, Aug 9, 2016 at 11:09 AM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> wrote:
> >> Does this mean you only have 1.6G memory for executor (others left for
> >> Python) ?
> >> The cached table could take 1.5G, it means almost nothing left for other
> >> things.
> > True. I have also tried with memoryOverhead being set to 800 (10% of the
> 8Gb
> > memory), but no difference. The "GC overhead limit exceeded" is still the
> > same.
> >
> >> Python UDF do requires some buffering in JVM, the size of buffering
> >> depends on how much rows are under processing by Python process.
> > I did some more testing in the meantime.
> > Leaving the UDFs as-is, but removing some other, static columns from the
> > above SELECT FROM command has stopped the memoryOverhead error from
> > occurring. I have plenty enough memory to store the results with all
> static
> > columns, plus when the UDFs are not there only the rest of the static
> > columns are, then it runs fine. This makes me believe that having UDFs
> and
> > many columns causes the issue together. Maybe when you have UDFs then
> > somehow the memory usage depends on the amount of data in that record
> (the
> > whole row), which includes other fields too, which are actually not used
> by
> > the UDF. Maybe the UDF serialization to Python serializes the whole row
> > instead of just the attributes of the UDF?
> >
> > On Mon, Aug 8, 2016 at 5:59 PM, Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> On Mon, Aug 8, 2016 at 2:24 PM, Zoltan Fedor <zoltan.1.fe...@gmail.com>
> >> wrote:
> >> > Hi all,
> >> >
> >> > I have an interesting issue trying to use UDFs from SparkSQL in Spark
> >> > 2.0.0
> >> > using pyspark.
> >> >
> >> > There is a big table (5.6 Billion rows, 450Gb in memory) loaded into
> 300
> >> > executors's memory in SparkSQL, on which we would do some calculation
> >> > using
> >> > UDFs in pyspark.
> >> > If I run my SQL on only a portion of the data (filtering by one of the
> >> > attributes), let's say 800 million records, then all works well. But
> >> > when I
> >> > run the same SQL on all the data, then I receive
> >> > "java.lang.OutOfMemoryError: GC overhead limit exceeded" from
> basically
> >> > all
> >> > of the executors.
> >> >
> >> > It seems to me that pyspark UDFs in SparkSQL might have a memory leak,
> >> > causing this "GC overhead limit being exceeded".
> >> >
> >> > Details:
> >> >
> >> > - using Spark 2.0.0 on a Hadoop YARN cluster
> >> >
> >> > - 300 executors, each with 2 CPU cores and 8Gb memory (
> >> > spark.yarn.executor.memoryOverhead=6400 )
> >>
> >> Does this mean you only have 1.6G memory for executor (others left for
> >> Python) ?
> >> The cached table could take 1.5G, it means almost nothing left for other
> >> things.
> >>
> >> Python UDF do requires some buffering in JVM, the size of buffering
> >> depends on
> >> how much rows are under processing by Python process.
> >>
> >> > - a table of 5.6 Billions rows loaded into the memory of the executors
> >> > (taking up 450Gb of memory), partitioned evenly across the executors
> >> >
> >> > - creating even the simplest UDF in SparkSQL causes 'GC overhead limit
> >> > exceeded' error if running on all records. Running the same on a
> smaller
> >> > dataset (~800 million rows) does succeed. If no UDF, the query succeed
> >> > on
> >> > the whole dataset.
> >> >
> >> > - simplified pyspark code:
> >> >
> >> > from pyspark.sql.types import StringType
> >> >
> >> > def test_udf(var):
> >> >     """test udf that will always return a"""
> >> >     return "a"
> >> > sqlContext.registerFunction("test_udf", test_udf, StringType())
> >> >
> >> > sqlContext.sql("""CACHE TABLE ma""")
> >> >
> >> > results_df = sqlContext.sql("""SELECT SOURCE, SOURCE_SYSTEM,
> >> >                 test_udf(STANDARD_ACCOUNT_STREET_SRC) AS TEST_UDF_OP,
> >> > ROUND(1.0 - (levenshtein(STANDARD_ACCOUNT_CITY_SRC,
> >> > STANDARD_ACCOUNT_CITY_SRC)
> >> >      /
> >> > CASE WHEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)>LENGTH
> >> > (STANDARD_ACCOUNT_CITY_SRC)
> >> >         THEN LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> >> >         ELSE LENGTH (STANDARD_ACCOUNT_CITY_SRC)
> >> >    END),2) AS SCORE_ED_STANDARD_ACCOUNT_CITY,
> >> > STANDARD_ACCOUNT_STATE_SRC, STANDARD_ACCOUNT_STATE_UNIV
> >> > FROM ma""")
> >> >
> >> > results_df.registerTempTable("m")
> >> > sqlContext.cacheTable("m")
> >> >
> >> > results_df = sqlContext.sql("""SELECT COUNT(*) FROM m""")
> >> > print(results_df.take(1))
> >> >
> >> >
> >> > - the error thrown on the executors:
> >> >
> >> > 16/08/08 15:38:17 ERROR util.Utils: Uncaught exception in thread
> stdout
> >> > writer for /hadoop/cloudera/parcels/Anaconda/bin/python
> >> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >> > at
> >> >
> >> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(
> UnsafeRow.java:503)
> >> > at
> >> >
> >> > org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(
> UnsafeRow.java:61)
> >> > at
> >> >
> >> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$
> 1.apply(BatchEvalPythonExec.scala:64)
> >> > at
> >> >
> >> > org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$
> 1.apply(BatchEvalPythonExec.scala:64)
> >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> >> > at
> >> >
> >> > scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.
> scala:1076)
> >> > at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1091)
> >> > at scala.collection.Iterator$GroupedIterator.fill(Iterator.
> scala:1129)
> >> > at
> >> > scala.collection.Iterator$GroupedIterator.hasNext(
> Iterator.scala:1132)
> >> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> >> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> >> > at
> >> >
> >> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(
> PythonRDD.scala:504)
> >> > at
> >> >
> >> > org.apache.spark.api.python.PythonRunner$WriterThread$$
> anonfun$run$3.apply(PythonRDD.scala:328)
> >> > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
> >> > at
> >> >
> >> > org.apache.spark.api.python.PythonRunner$WriterThread.run(
> PythonRDD.scala:269)
> >> > 16/08/08 15:38:17 ERROR executor.CoarseGrainedExecutorBackend:
> RECEIVED
> >> > SIGNAL TERM
> >> >
> >> >
> >> > Has anybody experienced these "GC overhead limit exceeded" errors with
> >> > pyspark UDFs before?
> >> >
> >> > Thanks,
> >> > Zoltan
> >> >
> >
> >
>

Reply via email to