Re: Python UDF performance at large scale

2015-06-25 Thread Davies Liu
I'm thinking that the batched synchronous version will be too slow
(with small batch size) or easy to OOM with large (batch size). If
it's not that hard, you can give it a try.

On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang justin.u...@gmail.com wrote:
 Correct, I was running with a batch size of about 100 when I did the tests,
 because I was worried about deadlocks. Do you have any concerns regarding
 the batched synchronous version of communication between the Java and Python
 processes, and if not, should I file a ticket and starting writing it?

 On Wed, Jun 24, 2015 at 7:27 PM Davies Liu dav...@databricks.com wrote:

 From you comment, the 2x improvement only happens when you have the
 batch size as 1, right?

 On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com
 wrote:
  FYI, just submitted a PR to Pyrolite to remove their StopException.
  https://github.com/irmen/Pyrolite/pull/30
 
  With my benchmark, removing it basically made it about 2x faster.
 
  On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
  punya.bis...@gmail.com
  wrote:
 
  Hi Davies,
 
  In general, do we expect people to use CPython only for heavyweight
  UDFs
  that invoke an external library? Are there any examples of using
  Jython,
  especially performance comparisons to Java/Scala and CPython? When
  using
  Jython, do you expect the driver to send code to the executor as a
  string,
  or is there a good way to serialized Jython lambdas?
 
  (For context, I was unable to serialize Nashorn lambdas when I tried to
  use them in Spark.)
 
  Punya
  On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com
  wrote:
 
  Fare points, I also like simpler solutions.
 
  The overhead of Python task could be a few of milliseconds, which
  means we also should eval them as batches (one Python task per batch).
 
  Decreasing the batch size for UDF sounds reasonable to me, together
  with other tricks to reduce the data in socket/pipe buffer.
 
  BTW, what do your UDF looks like? How about to use Jython to run
  simple Python UDF (without some external libraries).
 
  On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com
  wrote:
   // + punya
  
   Thanks for your quick response!
  
   I'm not sure that using an unbounded buffer is a good solution to
   the
   locking problem. For example, in the situation where I had 500
   columns,
   I am
   in fact storing 499 extra columns on the java side, which might make
   me
   OOM
   if I have to store many rows. In addition, if I am using an
   AutoBatchedSerializer, the java side might have to write 1  16 ==
   65536
   rows before python starts outputting elements, in which case, the
   Java
   side
   has to buffer 65536 complete rows. In general it seems fragile to
   rely
   on
   blocking behavior in the Python coprocess. By contrast, it's very
   easy
   to
   verify the correctness and performance characteristics of the
   synchronous
   blocking solution.
  
  
   On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com
   wrote:
  
   Thanks for looking into it, I'd like the idea of having
   ForkingIterator. If we have unlimited buffer in it, then will not
   have
   the problem of deadlock, I think. The writing thread will be
   blocked
   by Python process, so there will be not much rows be buffered(still
   be
   a reason to OOM). At least, this approach is better than current
   one.
  
   Could you create a JIRA and sending out the PR?
  
   On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
   justin.u...@gmail.com
   wrote:
BLUF: BatchPythonEvaluation's implementation is unusable at large
scale,
but
I have a proof-of-concept implementation that avoids caching the
entire
dataset.
   
Hi,
   
We have been running into performance problems using Python UDFs
with
DataFrames at large scale.
   
From the implementation of BatchPythonEvaluation, it looks like
the
goal
was
to reuse the PythonRDD code. It caches the entire child RDD so
that
it
can
do two passes over the data. One to give to the PythonRDD, then
one
to
join
the python lambda results with the original row (which may have
java
objects
that should be passed through).
   
In addition, it caches all the columns, even the ones that don't
need to
be
processed by the Python UDF. In the cases I was working with, I
had
a
500
column table, and i wanted to use a python UDF for one column,
and
it
ended
up caching all 500 columns.
   
I have a working solution over here that does it in one pass over
the
data,
avoiding caching
   
   
   
(https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs,
to
a
job
that finishes completely in 3 minutes. It is indeed quite hacky
and
prone to
deadlocks since there is 

Re: Python UDF performance at large scale

2015-06-25 Thread Justin Uang
Sweet, filed here: https://issues.apache.org/jira/browse/SPARK-8632

On Thu, Jun 25, 2015 at 3:05 AM Davies Liu dav...@databricks.com wrote:

 I'm thinking that the batched synchronous version will be too slow
 (with small batch size) or easy to OOM with large (batch size). If
 it's not that hard, you can give it a try.

 On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang justin.u...@gmail.com
 wrote:
  Correct, I was running with a batch size of about 100 when I did the
 tests,
  because I was worried about deadlocks. Do you have any concerns regarding
  the batched synchronous version of communication between the Java and
 Python
  processes, and if not, should I file a ticket and starting writing it?
 
  On Wed, Jun 24, 2015 at 7:27 PM Davies Liu dav...@databricks.com
 wrote:
 
  From you comment, the 2x improvement only happens when you have the
  batch size as 1, right?
 
  On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com
  wrote:
   FYI, just submitted a PR to Pyrolite to remove their StopException.
   https://github.com/irmen/Pyrolite/pull/30
  
   With my benchmark, removing it basically made it about 2x faster.
  
   On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
   punya.bis...@gmail.com
   wrote:
  
   Hi Davies,
  
   In general, do we expect people to use CPython only for heavyweight
   UDFs
   that invoke an external library? Are there any examples of using
   Jython,
   especially performance comparisons to Java/Scala and CPython? When
   using
   Jython, do you expect the driver to send code to the executor as a
   string,
   or is there a good way to serialized Jython lambdas?
  
   (For context, I was unable to serialize Nashorn lambdas when I tried
 to
   use them in Spark.)
  
   Punya
   On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com
   wrote:
  
   Fare points, I also like simpler solutions.
  
   The overhead of Python task could be a few of milliseconds, which
   means we also should eval them as batches (one Python task per
 batch).
  
   Decreasing the batch size for UDF sounds reasonable to me, together
   with other tricks to reduce the data in socket/pipe buffer.
  
   BTW, what do your UDF looks like? How about to use Jython to run
   simple Python UDF (without some external libraries).
  
   On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com
 
   wrote:
// + punya
   
Thanks for your quick response!
   
I'm not sure that using an unbounded buffer is a good solution to
the
locking problem. For example, in the situation where I had 500
columns,
I am
in fact storing 499 extra columns on the java side, which might
 make
me
OOM
if I have to store many rows. In addition, if I am using an
AutoBatchedSerializer, the java side might have to write 1  16
 ==
65536
rows before python starts outputting elements, in which case, the
Java
side
has to buffer 65536 complete rows. In general it seems fragile to
rely
on
blocking behavior in the Python coprocess. By contrast, it's very
easy
to
verify the correctness and performance characteristics of the
synchronous
blocking solution.
   
   
On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com
 
wrote:
   
Thanks for looking into it, I'd like the idea of having
ForkingIterator. If we have unlimited buffer in it, then will not
have
the problem of deadlock, I think. The writing thread will be
blocked
by Python process, so there will be not much rows be
 buffered(still
be
a reason to OOM). At least, this approach is better than current
one.
   
Could you create a JIRA and sending out the PR?
   
On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
justin.u...@gmail.com
wrote:
 BLUF: BatchPythonEvaluation's implementation is unusable at
 large
 scale,
 but
 I have a proof-of-concept implementation that avoids caching
 the
 entire
 dataset.

 Hi,

 We have been running into performance problems using Python
 UDFs
 with
 DataFrames at large scale.

 From the implementation of BatchPythonEvaluation, it looks like
 the
 goal
 was
 to reuse the PythonRDD code. It caches the entire child RDD so
 that
 it
 can
 do two passes over the data. One to give to the PythonRDD, then
 one
 to
 join
 the python lambda results with the original row (which may have
 java
 objects
 that should be passed through).

 In addition, it caches all the columns, even the ones that
 don't
 need to
 be
 processed by the Python UDF. In the cases I was working with, I
 had
 a
 500
 column table, and i wanted to use a python UDF for one column,
 and
 it
 ended
 up caching all 500 columns.

 I have a working solution over here that does it in one pass
 over
 the
 data,
 avoiding caching
 

Re: Python UDF performance at large scale

2015-06-24 Thread Davies Liu
Fare points, I also like simpler solutions.

The overhead of Python task could be a few of milliseconds, which
means we also should eval them as batches (one Python task per batch).

Decreasing the batch size for UDF sounds reasonable to me, together
with other tricks to reduce the data in socket/pipe buffer.

BTW, what do your UDF looks like? How about to use Jython to run
simple Python UDF (without some external libraries).

On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com wrote:
 // + punya

 Thanks for your quick response!

 I'm not sure that using an unbounded buffer is a good solution to the
 locking problem. For example, in the situation where I had 500 columns, I am
 in fact storing 499 extra columns on the java side, which might make me OOM
 if I have to store many rows. In addition, if I am using an
 AutoBatchedSerializer, the java side might have to write 1  16 == 65536
 rows before python starts outputting elements, in which case, the Java side
 has to buffer 65536 complete rows. In general it seems fragile to rely on
 blocking behavior in the Python coprocess. By contrast, it's very easy to
 verify the correctness and performance characteristics of the synchronous
 blocking solution.


 On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote:

 Thanks for looking into it, I'd like the idea of having
 ForkingIterator. If we have unlimited buffer in it, then will not have
 the problem of deadlock, I think. The writing thread will be blocked
 by Python process, so there will be not much rows be buffered(still be
 a reason to OOM). At least, this approach is better than current one.

 Could you create a JIRA and sending out the PR?

 On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
 wrote:
  BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
  but
  I have a proof-of-concept implementation that avoids caching the entire
  dataset.
 
  Hi,
 
  We have been running into performance problems using Python UDFs with
  DataFrames at large scale.
 
  From the implementation of BatchPythonEvaluation, it looks like the goal
  was
  to reuse the PythonRDD code. It caches the entire child RDD so that it
  can
  do two passes over the data. One to give to the PythonRDD, then one to
  join
  the python lambda results with the original row (which may have java
  objects
  that should be passed through).
 
  In addition, it caches all the columns, even the ones that don't need to
  be
  processed by the Python UDF. In the cases I was working with, I had a
  500
  column table, and i wanted to use a python UDF for one column, and it
  ended
  up caching all 500 columns.
 
  I have a working solution over here that does it in one pass over the
  data,
  avoiding caching
 
  (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
  With this patch, I go from a job that takes 20 minutes then OOMs, to a
  job
  that finishes completely in 3 minutes. It is indeed quite hacky and
  prone to
  deadlocks since there is buffering in many locations:
 
  - NEW: the ForkingIterator LinkedBlockingDeque
  - batching the rows before pickling them
  - os buffers on both sides
  - pyspark.serializers.BatchedSerializer
 
  We can avoid deadlock by being very disciplined. For example, we can
  have
  the ForkingIterator instead always do a check of whether the
  LinkedBlockingDeque is full and if so:
 
  Java
  - flush the java pickling buffer
  - send a flush command to the python process
  - os.flush the java side
 
  Python
  - flush BatchedSerializer
  - os.flush()
 
  I haven't added this yet. This is getting very complex however. Another
  model would just be to change the protocol between the java side and the
  worker to be a synchronous request/response. This has the disadvantage
  that
  the CPU isn't doing anything when the batch is being sent across, but it
  has
  the huge advantage of simplicity. In addition, I imagine that the actual
  IO
  between the processes isn't that slow, but rather the serialization of
  java
  objects into pickled bytes, and the deserialization/serialization +
  python
  loops on the python side. Another advantage is that we won't be taking
  more
  than 100% CPU since only one thread is doing CPU work at a time between
  the
  executor and the python interpreter.
 
  Any thoughts would be much appreciated =)
 
  Other improvements:
  - extract some code of the worker out of PythonRDD so that we can do
  a
  mapPartitions directly in BatchedPythonEvaluation without resorting to
  the
  hackery in ForkedRDD.compute(), which uses a cache to ensure that the
  other
  RDD can get a handle to the same iterator.
  - read elements and use a size estimator to create the BlockingQueue
  to
  make sure that we don't store too many things in memory when batching
  - patch Unpickler to not use StopException for control flow, which
  is
  slowing down 

Re: Python UDF performance at large scale

2015-06-24 Thread Punyashloka Biswal
Hi Davies,

In general, do we expect people to use CPython only for heavyweight UDFs
that invoke an external library? Are there any examples of using Jython,
especially performance comparisons to Java/Scala and CPython? When using
Jython, do you expect the driver to send code to the executor as a string,
or is there a good way to serialized Jython lambdas?

(For context, I was unable to serialize Nashorn lambdas when I tried to use
them in Spark.)

Punya
On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com wrote:

 Fare points, I also like simpler solutions.

 The overhead of Python task could be a few of milliseconds, which
 means we also should eval them as batches (one Python task per batch).

 Decreasing the batch size for UDF sounds reasonable to me, together
 with other tricks to reduce the data in socket/pipe buffer.

 BTW, what do your UDF looks like? How about to use Jython to run
 simple Python UDF (without some external libraries).

 On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com
 wrote:
  // + punya
 
  Thanks for your quick response!
 
  I'm not sure that using an unbounded buffer is a good solution to the
  locking problem. For example, in the situation where I had 500 columns,
 I am
  in fact storing 499 extra columns on the java side, which might make me
 OOM
  if I have to store many rows. In addition, if I am using an
  AutoBatchedSerializer, the java side might have to write 1  16 == 65536
  rows before python starts outputting elements, in which case, the Java
 side
  has to buffer 65536 complete rows. In general it seems fragile to rely on
  blocking behavior in the Python coprocess. By contrast, it's very easy to
  verify the correctness and performance characteristics of the synchronous
  blocking solution.
 
 
  On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com
 wrote:
 
  Thanks for looking into it, I'd like the idea of having
  ForkingIterator. If we have unlimited buffer in it, then will not have
  the problem of deadlock, I think. The writing thread will be blocked
  by Python process, so there will be not much rows be buffered(still be
  a reason to OOM). At least, this approach is better than current one.
 
  Could you create a JIRA and sending out the PR?
 
  On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
  wrote:
   BLUF: BatchPythonEvaluation's implementation is unusable at large
 scale,
   but
   I have a proof-of-concept implementation that avoids caching the
 entire
   dataset.
  
   Hi,
  
   We have been running into performance problems using Python UDFs with
   DataFrames at large scale.
  
   From the implementation of BatchPythonEvaluation, it looks like the
 goal
   was
   to reuse the PythonRDD code. It caches the entire child RDD so that it
   can
   do two passes over the data. One to give to the PythonRDD, then one to
   join
   the python lambda results with the original row (which may have java
   objects
   that should be passed through).
  
   In addition, it caches all the columns, even the ones that don't need
 to
   be
   processed by the Python UDF. In the cases I was working with, I had a
   500
   column table, and i wanted to use a python UDF for one column, and it
   ended
   up caching all 500 columns.
  
   I have a working solution over here that does it in one pass over the
   data,
   avoiding caching
  
   (
 https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
 ).
   With this patch, I go from a job that takes 20 minutes then OOMs, to a
   job
   that finishes completely in 3 minutes. It is indeed quite hacky and
   prone to
   deadlocks since there is buffering in many locations:
  
   - NEW: the ForkingIterator LinkedBlockingDeque
   - batching the rows before pickling them
   - os buffers on both sides
   - pyspark.serializers.BatchedSerializer
  
   We can avoid deadlock by being very disciplined. For example, we can
   have
   the ForkingIterator instead always do a check of whether the
   LinkedBlockingDeque is full and if so:
  
   Java
   - flush the java pickling buffer
   - send a flush command to the python process
   - os.flush the java side
  
   Python
   - flush BatchedSerializer
   - os.flush()
  
   I haven't added this yet. This is getting very complex however.
 Another
   model would just be to change the protocol between the java side and
 the
   worker to be a synchronous request/response. This has the disadvantage
   that
   the CPU isn't doing anything when the batch is being sent across, but
 it
   has
   the huge advantage of simplicity. In addition, I imagine that the
 actual
   IO
   between the processes isn't that slow, but rather the serialization of
   java
   objects into pickled bytes, and the deserialization/serialization +
   python
   loops on the python side. Another advantage is that we won't be taking
   more
   than 100% CPU since only one thread is doing CPU work at a 

Re: Python UDF performance at large scale

2015-06-24 Thread Justin Uang
Correct, I was running with a batch size of about 100 when I did the tests,
because I was worried about deadlocks. Do you have any concerns regarding
the batched synchronous version of communication between the Java and
Python processes, and if not, should I file a ticket and starting writing
it?
On Wed, Jun 24, 2015 at 7:27 PM Davies Liu dav...@databricks.com wrote:

 From you comment, the 2x improvement only happens when you have the
 batch size as 1, right?

 On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com
 wrote:
  FYI, just submitted a PR to Pyrolite to remove their StopException.
  https://github.com/irmen/Pyrolite/pull/30
 
  With my benchmark, removing it basically made it about 2x faster.
 
  On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal 
 punya.bis...@gmail.com
  wrote:
 
  Hi Davies,
 
  In general, do we expect people to use CPython only for heavyweight
 UDFs
  that invoke an external library? Are there any examples of using Jython,
  especially performance comparisons to Java/Scala and CPython? When using
  Jython, do you expect the driver to send code to the executor as a
 string,
  or is there a good way to serialized Jython lambdas?
 
  (For context, I was unable to serialize Nashorn lambdas when I tried to
  use them in Spark.)
 
  Punya
  On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com
 wrote:
 
  Fare points, I also like simpler solutions.
 
  The overhead of Python task could be a few of milliseconds, which
  means we also should eval them as batches (one Python task per batch).
 
  Decreasing the batch size for UDF sounds reasonable to me, together
  with other tricks to reduce the data in socket/pipe buffer.
 
  BTW, what do your UDF looks like? How about to use Jython to run
  simple Python UDF (without some external libraries).
 
  On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com
  wrote:
   // + punya
  
   Thanks for your quick response!
  
   I'm not sure that using an unbounded buffer is a good solution to the
   locking problem. For example, in the situation where I had 500
 columns,
   I am
   in fact storing 499 extra columns on the java side, which might make
 me
   OOM
   if I have to store many rows. In addition, if I am using an
   AutoBatchedSerializer, the java side might have to write 1  16 ==
   65536
   rows before python starts outputting elements, in which case, the
 Java
   side
   has to buffer 65536 complete rows. In general it seems fragile to
 rely
   on
   blocking behavior in the Python coprocess. By contrast, it's very
 easy
   to
   verify the correctness and performance characteristics of the
   synchronous
   blocking solution.
  
  
   On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com
   wrote:
  
   Thanks for looking into it, I'd like the idea of having
   ForkingIterator. If we have unlimited buffer in it, then will not
 have
   the problem of deadlock, I think. The writing thread will be blocked
   by Python process, so there will be not much rows be buffered(still
 be
   a reason to OOM). At least, this approach is better than current
 one.
  
   Could you create a JIRA and sending out the PR?
  
   On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
 
   wrote:
BLUF: BatchPythonEvaluation's implementation is unusable at large
scale,
but
I have a proof-of-concept implementation that avoids caching the
entire
dataset.
   
Hi,
   
We have been running into performance problems using Python UDFs
with
DataFrames at large scale.
   
From the implementation of BatchPythonEvaluation, it looks like
 the
goal
was
to reuse the PythonRDD code. It caches the entire child RDD so
 that
it
can
do two passes over the data. One to give to the PythonRDD, then
 one
to
join
the python lambda results with the original row (which may have
 java
objects
that should be passed through).
   
In addition, it caches all the columns, even the ones that don't
need to
be
processed by the Python UDF. In the cases I was working with, I
 had
a
500
column table, and i wanted to use a python UDF for one column, and
it
ended
up caching all 500 columns.
   
I have a working solution over here that does it in one pass over
the
data,
avoiding caching
   
   
(
 https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
 ).
With this patch, I go from a job that takes 20 minutes then OOMs,
 to
a
job
that finishes completely in 3 minutes. It is indeed quite hacky
 and
prone to
deadlocks since there is buffering in many locations:
   
- NEW: the ForkingIterator LinkedBlockingDeque
- batching the rows before pickling them
- os buffers on both sides
- pyspark.serializers.BatchedSerializer
   
We can avoid deadlock by being very disciplined. For example, we
 can
have

Re: Python UDF performance at large scale

2015-06-24 Thread Davies Liu
From you comment, the 2x improvement only happens when you have the
batch size as 1, right?

On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang justin.u...@gmail.com wrote:
 FYI, just submitted a PR to Pyrolite to remove their StopException.
 https://github.com/irmen/Pyrolite/pull/30

 With my benchmark, removing it basically made it about 2x faster.

 On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal punya.bis...@gmail.com
 wrote:

 Hi Davies,

 In general, do we expect people to use CPython only for heavyweight UDFs
 that invoke an external library? Are there any examples of using Jython,
 especially performance comparisons to Java/Scala and CPython? When using
 Jython, do you expect the driver to send code to the executor as a string,
 or is there a good way to serialized Jython lambdas?

 (For context, I was unable to serialize Nashorn lambdas when I tried to
 use them in Spark.)

 Punya
 On Wed, Jun 24, 2015 at 2:26 AM Davies Liu dav...@databricks.com wrote:

 Fare points, I also like simpler solutions.

 The overhead of Python task could be a few of milliseconds, which
 means we also should eval them as batches (one Python task per batch).

 Decreasing the batch size for UDF sounds reasonable to me, together
 with other tricks to reduce the data in socket/pipe buffer.

 BTW, what do your UDF looks like? How about to use Jython to run
 simple Python UDF (without some external libraries).

 On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang justin.u...@gmail.com
 wrote:
  // + punya
 
  Thanks for your quick response!
 
  I'm not sure that using an unbounded buffer is a good solution to the
  locking problem. For example, in the situation where I had 500 columns,
  I am
  in fact storing 499 extra columns on the java side, which might make me
  OOM
  if I have to store many rows. In addition, if I am using an
  AutoBatchedSerializer, the java side might have to write 1  16 ==
  65536
  rows before python starts outputting elements, in which case, the Java
  side
  has to buffer 65536 complete rows. In general it seems fragile to rely
  on
  blocking behavior in the Python coprocess. By contrast, it's very easy
  to
  verify the correctness and performance characteristics of the
  synchronous
  blocking solution.
 
 
  On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com
  wrote:
 
  Thanks for looking into it, I'd like the idea of having
  ForkingIterator. If we have unlimited buffer in it, then will not have
  the problem of deadlock, I think. The writing thread will be blocked
  by Python process, so there will be not much rows be buffered(still be
  a reason to OOM). At least, this approach is better than current one.
 
  Could you create a JIRA and sending out the PR?
 
  On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
  wrote:
   BLUF: BatchPythonEvaluation's implementation is unusable at large
   scale,
   but
   I have a proof-of-concept implementation that avoids caching the
   entire
   dataset.
  
   Hi,
  
   We have been running into performance problems using Python UDFs
   with
   DataFrames at large scale.
  
   From the implementation of BatchPythonEvaluation, it looks like the
   goal
   was
   to reuse the PythonRDD code. It caches the entire child RDD so that
   it
   can
   do two passes over the data. One to give to the PythonRDD, then one
   to
   join
   the python lambda results with the original row (which may have java
   objects
   that should be passed through).
  
   In addition, it caches all the columns, even the ones that don't
   need to
   be
   processed by the Python UDF. In the cases I was working with, I had
   a
   500
   column table, and i wanted to use a python UDF for one column, and
   it
   ended
   up caching all 500 columns.
  
   I have a working solution over here that does it in one pass over
   the
   data,
   avoiding caching
  
  
   (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
   With this patch, I go from a job that takes 20 minutes then OOMs, to
   a
   job
   that finishes completely in 3 minutes. It is indeed quite hacky and
   prone to
   deadlocks since there is buffering in many locations:
  
   - NEW: the ForkingIterator LinkedBlockingDeque
   - batching the rows before pickling them
   - os buffers on both sides
   - pyspark.serializers.BatchedSerializer
  
   We can avoid deadlock by being very disciplined. For example, we can
   have
   the ForkingIterator instead always do a check of whether the
   LinkedBlockingDeque is full and if so:
  
   Java
   - flush the java pickling buffer
   - send a flush command to the python process
   - os.flush the java side
  
   Python
   - flush BatchedSerializer
   - os.flush()
  
   I haven't added this yet. This is getting very complex however.
   Another
   model would just be to change the protocol between the java side and
   the
   worker to be a synchronous request/response. This has the
   

Re: Python UDF performance at large scale

2015-06-23 Thread Justin Uang
// + punya

Thanks for your quick response!

I'm not sure that using an unbounded buffer is a good solution to the
locking problem. For example, in the situation where I had 500 columns, I
am in fact storing 499 extra columns on the java side, which might make me
OOM if I have to store many rows. In addition, if I am using an
AutoBatchedSerializer, the java side might have to write 1  16 == 65536
rows before python starts outputting elements, in which case, the Java side
has to buffer 65536 complete rows. In general it seems fragile to rely on
blocking behavior in the Python coprocess. By contrast, it's very easy to
verify the correctness and performance characteristics of the synchronous
blocking solution.


On Tue, Jun 23, 2015 at 7:21 PM Davies Liu dav...@databricks.com wrote:

 Thanks for looking into it, I'd like the idea of having
 ForkingIterator. If we have unlimited buffer in it, then will not have
 the problem of deadlock, I think. The writing thread will be blocked
 by Python process, so there will be not much rows be buffered(still be
 a reason to OOM). At least, this approach is better than current one.

 Could you create a JIRA and sending out the PR?

 On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com
 wrote:
  BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
 but
  I have a proof-of-concept implementation that avoids caching the entire
  dataset.
 
  Hi,
 
  We have been running into performance problems using Python UDFs with
  DataFrames at large scale.
 
  From the implementation of BatchPythonEvaluation, it looks like the goal
 was
  to reuse the PythonRDD code. It caches the entire child RDD so that it
 can
  do two passes over the data. One to give to the PythonRDD, then one to
 join
  the python lambda results with the original row (which may have java
 objects
  that should be passed through).
 
  In addition, it caches all the columns, even the ones that don't need to
 be
  processed by the Python UDF. In the cases I was working with, I had a 500
  column table, and i wanted to use a python UDF for one column, and it
 ended
  up caching all 500 columns.
 
  I have a working solution over here that does it in one pass over the
 data,
  avoiding caching
  (
 https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
 ).
  With this patch, I go from a job that takes 20 minutes then OOMs, to a
 job
  that finishes completely in 3 minutes. It is indeed quite hacky and
 prone to
  deadlocks since there is buffering in many locations:
 
  - NEW: the ForkingIterator LinkedBlockingDeque
  - batching the rows before pickling them
  - os buffers on both sides
  - pyspark.serializers.BatchedSerializer
 
  We can avoid deadlock by being very disciplined. For example, we can have
  the ForkingIterator instead always do a check of whether the
  LinkedBlockingDeque is full and if so:
 
  Java
  - flush the java pickling buffer
  - send a flush command to the python process
  - os.flush the java side
 
  Python
  - flush BatchedSerializer
  - os.flush()
 
  I haven't added this yet. This is getting very complex however. Another
  model would just be to change the protocol between the java side and the
  worker to be a synchronous request/response. This has the disadvantage
 that
  the CPU isn't doing anything when the batch is being sent across, but it
 has
  the huge advantage of simplicity. In addition, I imagine that the actual
 IO
  between the processes isn't that slow, but rather the serialization of
 java
  objects into pickled bytes, and the deserialization/serialization +
 python
  loops on the python side. Another advantage is that we won't be taking
 more
  than 100% CPU since only one thread is doing CPU work at a time between
 the
  executor and the python interpreter.
 
  Any thoughts would be much appreciated =)
 
  Other improvements:
  - extract some code of the worker out of PythonRDD so that we can do
 a
  mapPartitions directly in BatchedPythonEvaluation without resorting to
 the
  hackery in ForkedRDD.compute(), which uses a cache to ensure that the
 other
  RDD can get a handle to the same iterator.
  - read elements and use a size estimator to create the BlockingQueue
 to
  make sure that we don't store too many things in memory when batching
  - patch Unpickler to not use StopException for control flow, which is
  slowing down the java side
 
 



Python UDF performance at large scale

2015-06-23 Thread Justin Uang
BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
but I have a proof-of-concept implementation that avoids caching the entire
dataset.

Hi,

We have been running into performance problems using Python UDFs with
DataFrames at large scale.

From the implementation of BatchPythonEvaluation, it looks like the goal
was to reuse the PythonRDD code. It caches the entire child RDD so that it
can do two passes over the data. One to give to the PythonRDD, then one to
join the python lambda results with the original row (which may have java
objects that should be passed through).

In addition, it caches all the columns, even the ones that don't need to be
processed by the Python UDF. In the cases I was working with, I had a 500
column table, and i wanted to use a python UDF for one column, and it ended
up caching all 500 columns.

I have a working solution over here that does it in one pass over the data,
avoiding caching (
https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs, to a job
that finishes completely in 3 minutes. It is indeed quite hacky and prone
to deadlocks since there is buffering in many locations:

- NEW: the ForkingIterator LinkedBlockingDeque
- batching the rows before pickling them
- os buffers on both sides
- pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have
the ForkingIterator instead always do a check of whether the
LinkedBlockingDeque is full and if so:

Java
- flush the java pickling buffer
- send a flush command to the python process
- os.flush the java side

Python
- flush BatchedSerializer
- os.flush()

I haven't added this yet. This is getting very complex however. Another
model would just be to change the protocol between the java side and the
worker to be a synchronous request/response. This has the disadvantage that
the CPU isn't doing anything when the batch is being sent across, but it
has the huge advantage of simplicity. In addition, I imagine that the
actual IO between the processes isn't that slow, but rather the
serialization of java objects into pickled bytes, and the
deserialization/serialization + python loops on the python side. Another
advantage is that we won't be taking more than 100% CPU since only one
thread is doing CPU work at a time between the executor and the python
interpreter.

Any thoughts would be much appreciated =)

Other improvements:
- extract some code of the worker out of PythonRDD so that we can do a
mapPartitions directly in BatchedPythonEvaluation without resorting to the
hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
RDD can get a handle to the same iterator.
- read elements and use a size estimator to create the BlockingQueue to
make sure that we don't store too many things in memory when batching
- patch Unpickler to not use StopException for control flow, which is
slowing down the java side


Re: Python UDF performance at large scale

2015-06-23 Thread Davies Liu
Thanks for looking into it, I'd like the idea of having
ForkingIterator. If we have unlimited buffer in it, then will not have
the problem of deadlock, I think. The writing thread will be blocked
by Python process, so there will be not much rows be buffered(still be
a reason to OOM). At least, this approach is better than current one.

Could you create a JIRA and sending out the PR?

On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang justin.u...@gmail.com wrote:
 BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but
 I have a proof-of-concept implementation that avoids caching the entire
 dataset.

 Hi,

 We have been running into performance problems using Python UDFs with
 DataFrames at large scale.

 From the implementation of BatchPythonEvaluation, it looks like the goal was
 to reuse the PythonRDD code. It caches the entire child RDD so that it can
 do two passes over the data. One to give to the PythonRDD, then one to join
 the python lambda results with the original row (which may have java objects
 that should be passed through).

 In addition, it caches all the columns, even the ones that don't need to be
 processed by the Python UDF. In the cases I was working with, I had a 500
 column table, and i wanted to use a python UDF for one column, and it ended
 up caching all 500 columns.

 I have a working solution over here that does it in one pass over the data,
 avoiding caching
 (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
 With this patch, I go from a job that takes 20 minutes then OOMs, to a job
 that finishes completely in 3 minutes. It is indeed quite hacky and prone to
 deadlocks since there is buffering in many locations:

 - NEW: the ForkingIterator LinkedBlockingDeque
 - batching the rows before pickling them
 - os buffers on both sides
 - pyspark.serializers.BatchedSerializer

 We can avoid deadlock by being very disciplined. For example, we can have
 the ForkingIterator instead always do a check of whether the
 LinkedBlockingDeque is full and if so:

 Java
 - flush the java pickling buffer
 - send a flush command to the python process
 - os.flush the java side

 Python
 - flush BatchedSerializer
 - os.flush()

 I haven't added this yet. This is getting very complex however. Another
 model would just be to change the protocol between the java side and the
 worker to be a synchronous request/response. This has the disadvantage that
 the CPU isn't doing anything when the batch is being sent across, but it has
 the huge advantage of simplicity. In addition, I imagine that the actual IO
 between the processes isn't that slow, but rather the serialization of java
 objects into pickled bytes, and the deserialization/serialization + python
 loops on the python side. Another advantage is that we won't be taking more
 than 100% CPU since only one thread is doing CPU work at a time between the
 executor and the python interpreter.

 Any thoughts would be much appreciated =)

 Other improvements:
 - extract some code of the worker out of PythonRDD so that we can do a
 mapPartitions directly in BatchedPythonEvaluation without resorting to the
 hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
 RDD can get a handle to the same iterator.
 - read elements and use a size estimator to create the BlockingQueue to
 make sure that we don't store too many things in memory when batching
 - patch Unpickler to not use StopException for control flow, which is
 slowing down the java side



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org