Re: understanding iterator of series to iterator of series pandasUDF
That's about right, but the iterator UDF is executed per partition, not worker. Series to series is just simpler for cases where init does not matter. On Tue, Jan 4, 2022, 12:25 PM Nitin Siwach wrote: > I think I have an understanding now. > 1. In iterator to iterator the pandasUDF is called num_workers number of > times. The entire iterator is passed once to the pandasUDF and then the > iterator is consumed arrow_batch by arrow_batch > 2. In series to series the pandasUDF is called num_rows/arrow_batch_size > number of times. pandasUDF is called anew for each batch > > I have one final doubt though. *I now do not see the need for series to > series pandasUDF to exist. IMO every use case can be solved more > efficiently with iterator of series to iterator of series pandasUDF. Under > what circumstances would a series to series pandasUDF be recommended over > an iterator of series to iterator of series pandasUDF?* > > On Tue, Jan 4, 2022 at 7:40 PM Sean Owen wrote: > >> Yes, the UDF gets an iterator of pandas DataFrames, so your UDF will >> process them one at a time. >> The idea is to perform any expensive init once per partition, once before >> many DataFrames are processed, rather than before each DataFrame. >> The Arrow conversion is the same in either case. The benefit comes from >> processing batches as DataFrames in both cases. >> >> On Tue, Jan 4, 2022 at 8:05 AM Nitin Siwach >> wrote: >> >>> I understand pandasUDF as follows: >>> >>> 1. There are multiple partitions per worker >>> 2. Multiple arrow batches are converted per partition >>> 3. Sent to python process >>> 4. In the case of Series to Series the pandasUDF is applied to each >>> arrow batch one after the other? **(So, is it that (a) - The vectorisation >>> is at the arrow batch level but each batch, in turn, is processed >>> sequentially by the worker. Or, is it that (b) - The arrow batches are >>> combined after all have arrived and then the pandasUDF is applied to the >>> whole?)** I think it is (b). i.e. the arrow batches are combined. I have >>> given my reasoning below >>> >>> Given this understanding and blackbishop's answer I have the following >>> further questions: >>> >>> *How exactly is Iterator versions of pandasUDFs working?* >>> >>> 1. If there is some expensive initialization then why can we not do that >>> in the case of series to series pandasUDF as well. In the case of iterator >>> of series to iterator of series the initialization is done and is shared >>> across all the workers and used for all the arrow batches. Why can not the >>> same process be followed for a series to series pandasUDF? initialize --> >>> Share to workers --> once all the arrow batches are combined on a worker, >>> Apply? >>> 2. I can see that we might want to separate out the execution of i/o and >>> python code on arrow batches so as one batch is being read in the pandasUDF >>> is being run on the previous batch. (Why is this not done in the case of >>> series to series? **This is why I think all the arrow batches are combined >>> before running them through the pandasUDF. Because, otherwise the same i/o >>> parallelization benefits are available for series to series pandasUDF as >>> well** >>> >>> One more question: >>> >>> 1. Since the output is an Iterator of Series, where is the vectorisation >>> then? Is it that the pandasUDF is run on an entire arrow batch and then the >>> result is emitted row by row? Or, is the pandasUDF processing the arrow >>> batches row by row and then emitting the result (This loses vectorisation >>> as I see it) >>> >>
Re: understanding iterator of series to iterator of series pandasUDF
Yes, the UDF gets an iterator of pandas DataFrames, so your UDF will process them one at a time. The idea is to perform any expensive init once per partition, once before many DataFrames are processed, rather than before each DataFrame. The Arrow conversion is the same in either case. The benefit comes from processing batches as DataFrames in both cases. On Tue, Jan 4, 2022 at 8:05 AM Nitin Siwach wrote: > I understand pandasUDF as follows: > > 1. There are multiple partitions per worker > 2. Multiple arrow batches are converted per partition > 3. Sent to python process > 4. In the case of Series to Series the pandasUDF is applied to each arrow > batch one after the other? **(So, is it that (a) - The vectorisation is at > the arrow batch level but each batch, in turn, is processed sequentially by > the worker. Or, is it that (b) - The arrow batches are combined after all > have arrived and then the pandasUDF is applied to the whole?)** I think it > is (b). i.e. the arrow batches are combined. I have given my reasoning below > > Given this understanding and blackbishop's answer I have the following > further questions: > > *How exactly is Iterator versions of pandasUDFs working?* > > 1. If there is some expensive initialization then why can we not do that > in the case of series to series pandasUDF as well. In the case of iterator > of series to iterator of series the initialization is done and is shared > across all the workers and used for all the arrow batches. Why can not the > same process be followed for a series to series pandasUDF? initialize --> > Share to workers --> once all the arrow batches are combined on a worker, > Apply? > 2. I can see that we might want to separate out the execution of i/o and > python code on arrow batches so as one batch is being read in the pandasUDF > is being run on the previous batch. (Why is this not done in the case of > series to series? **This is why I think all the arrow batches are combined > before running them through the pandasUDF. Because, otherwise the same i/o > parallelization benefits are available for series to series pandasUDF as > well** > > One more question: > > 1. Since the output is an Iterator of Series, where is the vectorisation > then? Is it that the pandasUDF is run on an entire arrow batch and then the > result is emitted row by row? Or, is the pandasUDF processing the arrow > batches row by row and then emitting the result (This loses vectorisation > as I see it) >
understanding iterator of series to iterator of series pandasUDF
I understand pandasUDF as follows: 1. There are multiple partitions per worker 2. Multiple arrow batches are converted per partition 3. Sent to python process 4. In the case of Series to Series the pandasUDF is applied to each arrow batch one after the other? **(So, is it that (a) - The vectorisation is at the arrow batch level but each batch, in turn, is processed sequentially by the worker. Or, is it that (b) - The arrow batches are combined after all have arrived and then the pandasUDF is applied to the whole?)** I think it is (b). i.e. the arrow batches are combined. I have given my reasoning below Given this understanding and blackbishop's answer I have the following further questions: *How exactly is Iterator versions of pandasUDFs working?* 1. If there is some expensive initialization then why can we not do that in the case of series to series pandasUDF as well. In the case of iterator of series to iterator of series the initialization is done and is shared across all the workers and used for all the arrow batches. Why can not the same process be followed for a series to series pandasUDF? initialize --> Share to workers --> once all the arrow batches are combined on a worker, Apply? 2. I can see that we might want to separate out the execution of i/o and python code on arrow batches so as one batch is being read in the pandasUDF is being run on the previous batch. (Why is this not done in the case of series to series? **This is why I think all the arrow batches are combined before running them through the pandasUDF. Because, otherwise the same i/o parallelization benefits are available for series to series pandasUDF as well** One more question: 1. Since the output is an Iterator of Series, where is the vectorisation then? Is it that the pandasUDF is run on an entire arrow batch and then the result is emitted row by row? Or, is the pandasUDF processing the arrow batches row by row and then emitting the result (This loses vectorisation as I see it)