Hi,

I am copying Dr. Zaharia in this email as I am quoting from his book (once
again I may be wrong):
Chapter 5: Basic Structured Operations >> Creating Rows

You can create rows by manually instantiating a Row object with the values
that belong in each column. It’s important to note that only DataFrames
have schemas. Rows themselves do not have schemas. This means that if you
create a Row manually, you must specify the values in the same order as the
schema of the DataFrame to which they might be appended (we will see this
when we discuss creating DataFrames):

Chapter 6: Working with different types of data
Starting this Python process is expensive, but the real cost is in
serializing the data to Python. This is costly for two reasons: it is an
expensive computation, but also, after the data enters Python, Spark cannot
manage the memory of the worker. This means that you could potentially
cause a worker to fail if it becomes resource constrained (because both the
JVM and Python are competing for memory on the same machine).

Chapter 18: Monitoring and Debugging (as Sean was mentioning this is about
Driver OOM error)
Issues with JVMs running out of memory can happen if you are using another
language binding, such as Python, due to data conversion between the two
requiring too much memory in the JVM. Try to see whether your issue is
specific to your chosen language and bring back less data to the driver
node, or write it to a file instead of bringing it back as in-memory
objects.

Regards,
Gourav Sengupta


On Wed, Nov 3, 2021 at 10:09 PM Sergey Ivanychev <sergeyivanyc...@gmail.com>
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh <mich.talebza...@gmail.com>
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,....
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen <sro...@gmail.com> wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>> <https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>

Reply via email to