Re: pyspark join crash
I think the problem is that once unpacked in Python, the objects take considerably more space, as they are stored as Python objects in a Python dictionary. Take a look at python/pyspark/join.py and combineByKey in python/pyspark/rdd.py. We should probably try to store these in serialized form. I’m not sure whether there’s a great way to inspect a Python process’s memory, but looking at what consumes memory in a reducer process would be useful. Matei On Jun 4, 2014, at 2:34 PM, Brad Miller wrote: > Hi Matei, > > Thanks for the reply and creating the JIRA. I hear what you're saying, > although to be clear I want to still state that it seems like each reduce > task is loading significantly more data than just the records needed for that > task. The workers seem to load all data from each block containing a record > needed by the reduce task. > > I base this hypothesis on the following: > -My dataset is about 100G uncompressed, 22G serialized in memory with > compression enabled > -There are 130K records > -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) > -There are 3 cores per node (each running one reduce task at a time) > -Each node has 32G of memory > > Note that I am attempting to join the dataset to itself and I ran this > experiment after caching the dataset in memory with serialization and > compression enabled. > > Given these figures, even with only 200 partitions the average output > partition size (uncompressed) would be 1G (as the dataset is being joined to > itself, resulting in 200G over 200 partitions), requiring 3G from each > machine on average. The behavior I observe is that the kernel kills jobs in > many of the nodes at nearly the exact same time right after the read phase > starts; it seems likely this would occur in each node except the master > begins detecting failures and stops the job (and I observe memory spiking on > all machines). Indeed, I observe a large memory spike at each node. > > When I attempt the join with 2000 output partitions, it succeeds. Note that > there are about 65 records per output partition on average, which means the > reader only needs to load input from about 130 blocks (as the dataset is > joined to itself). Given that the average uncompressed block size is 60M, > even if the entire block were loaded (not just the relevant record) we would > expect about 23G of memory to be used per node on average. > > I began suspecting the behavior of loading entire blocks based on the logging > from the workers (i.e. "BlockFetcherIterator$BasicBlockFetcherIterator: > Getting 122 non-empty blocks out of 3354 blocks"). If it is definitely not > the case that entire blocks are loaded from the writers, then it would seem > like there is some significant overhead which is chewing threw lots of memory > (perhaps similar to the problem with python broadcast variables chewing > through memory https://spark-project.atlassian.net/browse/SPARK-1065). > > -Brad > > > > On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia wrote: > In PySpark, the data processed by each reduce task needs to fit in memory > within the Python process, so you should use more tasks to process this > dataset. Data is spilled to disk across tasks. > > I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — > it’s something we’ve been meaning to look at soon. > > Matei > > On Jun 4, 2014, at 8:23 AM, Brad Miller wrote: > > > Hi All, > > > > I have experienced some crashing behavior with join in pyspark. When I > > attempt a join with 2000 partitions in the result, the join succeeds, but > > when I use only 200 partitions in the result, the join fails with the > > message "Job aborted due to stage failure: Master removed our application: > > FAILED". > > > > The crash always occurs at the beginning of the shuffle phase. Based on my > > observations, it seems like the workers in the read phase may be fetching > > entire blocks from the write phase of the shuffle rather than just the > > records necessary to compose the partition the reader is responsible for. > > Hence, when there are fewer partitions in the read phase, the worker is > > likely to need a record from each of the write partitions and consequently > > attempts to load the entire data set into the memory of a single machine > > (which then causes the out of memory crash I observe in /var/log/syslog). > > > > Can anybody confirm if this is the behavior of pyspark? I am glad to > > supply additional details about my observed behavior upon request. > > > > best, > > -Brad > >
Re: pyspark join crash
Hi Matei, Thanks for the reply and creating the JIRA. I hear what you're saying, although to be clear I want to still state that it seems like each reduce task is loading significantly more data than just the records needed for that task. The workers seem to load all data from each block containing a record needed by the reduce task. I base this hypothesis on the following: -My dataset is about 100G uncompressed, 22G serialized in memory with compression enabled -There are 130K records -The initial RDD contains 1677 partitions, averaging 60M (uncompressed) -There are 3 cores per node (each running one reduce task at a time) -Each node has 32G of memory Note that I am attempting to join the dataset to itself and I ran this experiment after caching the dataset in memory with serialization and compression enabled. Given these figures, even with only 200 partitions the average output partition size (uncompressed) would be 1G (as the dataset is being joined to itself, resulting in 200G over 200 partitions), requiring 3G from each machine on average. The behavior I observe is that the kernel kills jobs in many of the nodes at nearly the exact same time right after the read phase starts; it seems likely this would occur in each node except the master begins detecting failures and stops the job (and I observe memory spiking on all machines). Indeed, I observe a large memory spike at each node. When I attempt the join with 2000 output partitions, it succeeds. Note that there are about 65 records per output partition on average, which means the reader only needs to load input from about 130 blocks (as the dataset is joined to itself). Given that the average uncompressed block size is 60M, even if the entire block were loaded (not just the relevant record) we would expect about 23G of memory to be used per node on average. I began suspecting the behavior of loading entire blocks based on the logging from the workers (i.e. "BlockFetcherIterator$BasicBlockFetcherIterator: Getting 122 non-empty blocks out of 3354 blocks"). If it is definitely not the case that entire blocks are loaded from the writers, then it would seem like there is some significant overhead which is chewing threw lots of memory (perhaps similar to the problem with python broadcast variables chewing through memory https://spark-project.atlassian.net/browse/SPARK-1065). -Brad On Wed, Jun 4, 2014 at 1:42 PM, Matei Zaharia wrote: > In PySpark, the data processed by each reduce task needs to fit in memory > within the Python process, so you should use more tasks to process this > dataset. Data is spilled to disk across tasks. > > I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track > this — it’s something we’ve been meaning to look at soon. > > Matei > > On Jun 4, 2014, at 8:23 AM, Brad Miller > wrote: > > > Hi All, > > > > I have experienced some crashing behavior with join in pyspark. When I > attempt a join with 2000 partitions in the result, the join succeeds, but > when I use only 200 partitions in the result, the join fails with the > message "Job aborted due to stage failure: Master removed our application: > FAILED". > > > > The crash always occurs at the beginning of the shuffle phase. Based on > my observations, it seems like the workers in the read phase may be > fetching entire blocks from the write phase of the shuffle rather than just > the records necessary to compose the partition the reader is responsible > for. Hence, when there are fewer partitions in the read phase, the worker > is likely to need a record from each of the write partitions and > consequently attempts to load the entire data set into the memory of a > single machine (which then causes the out of memory crash I observe in > /var/log/syslog). > > > > Can anybody confirm if this is the behavior of pyspark? I am glad to > supply additional details about my observed behavior upon request. > > > > best, > > -Brad > >
Re: pyspark join crash
In PySpark, the data processed by each reduce task needs to fit in memory within the Python process, so you should use more tasks to process this dataset. Data is spilled to disk across tasks. I’ve created https://issues.apache.org/jira/browse/SPARK-2021 to track this — it’s something we’ve been meaning to look at soon. Matei On Jun 4, 2014, at 8:23 AM, Brad Miller wrote: > Hi All, > > I have experienced some crashing behavior with join in pyspark. When I > attempt a join with 2000 partitions in the result, the join succeeds, but > when I use only 200 partitions in the result, the join fails with the message > "Job aborted due to stage failure: Master removed our application: FAILED". > > The crash always occurs at the beginning of the shuffle phase. Based on my > observations, it seems like the workers in the read phase may be fetching > entire blocks from the write phase of the shuffle rather than just the > records necessary to compose the partition the reader is responsible for. > Hence, when there are fewer partitions in the read phase, the worker is > likely to need a record from each of the write partitions and consequently > attempts to load the entire data set into the memory of a single machine > (which then causes the out of memory crash I observe in /var/log/syslog). > > Can anybody confirm if this is the behavior of pyspark? I am glad to supply > additional details about my observed behavior upon request. > > best, > -Brad
pyspark join crash
Hi All, I have experienced some crashing behavior with join in pyspark. When I attempt a join with 2000 partitions in the result, the join succeeds, but when I use only 200 partitions in the result, the join fails with the message "Job aborted due to stage failure: Master removed our application: FAILED". The crash always occurs at the beginning of the shuffle phase. Based on my observations, it seems like the workers in the read phase may be fetching entire blocks from the write phase of the shuffle rather than just the records necessary to compose the partition the reader is responsible for. Hence, when there are fewer partitions in the read phase, the worker is likely to need a record from each of the write partitions and consequently attempts to load the entire data set into the memory of a single machine (which then causes the out of memory crash I observe in /var/log/syslog). Can anybody confirm if this is the behavior of pyspark? I am glad to supply additional details about my observed behavior upon request. best, -Brad