GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/2844
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:
- Remove all state from the global TorrentBroadcast object. This state
consisted mainly of configuration options, like the block size and
compression codec, and was read by the blockify / unblockify methods.
Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
size was always determined by the first SparkConf that TorrentBroadast was
initialized with; as a result, unit tests could not properly test
TorrentBroadcast with different block sizes.
Instead, blockifyObject and unBlockifyObject now accept compression codecs
and blockSizes as arguments. These arguments are supplied at the call
sites
inside of TorrentBroadcast instances. Each TorrentBroadcast instance
determines these values from SparkEnv's SparkConf. I was careful to
ensure
that we do not accidentally serialize CompressionCodec or SparkConf
objects
as part of the TorrentBroadcast object.
- Remove special-case handling of local-mode in TorrentBroadcast. I don't
think that broadcast implementations should know about whether we're
running
in local mode. If we want to optimize the performance of broadcast in
local
mode, then we should detect this at a higher level and use a dummy
LocalBroadcastFactory implementation instead.
Removing this code fixes a subtle error condition: in the old local mode
code, a failure to find the broadcast in the local BlockManager would lead
to an attempt to deblockify zero blocks, which could lead to confusing
deserialization or decompression errors when we attempted to decompress
an empty byte array. This should never have happened, though: a failure
to
find the block in local mode is evidence of some other error. The changes
here will make it easier to debug those errors if they ever happen.
- Add a check that throws an exception when attempting to deblockify an
empty array.
- Use ScalaCheck to add a test to check that TorrentBroadcast's
blockifyObject and unBlockifyObject methods are inverses.
- Misc. cleanup and logging improvements.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark torrentbroadcast-bugfix
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/2844.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2844
----
commit 48c98c1996c87cebbd0669924f57527b8e81c35e
Author: Josh Rosen <[email protected]>
Date: 2014-10-19T06:36:49Z
[SPARK-3958] TorrentBroadcast cleanup / debugging improvements.
This PR makes several changes to TorrentBroadcast in order to make
it easier to reason about, which should help when debugging SPARK-3958.
The key changes:
- Remove all state from the global TorrentBroadcast object. This state
consisted mainly of configuration options, like the block size and
compression codec, and was read by the blockify / unblockify methods.
Unfortunately, the use of `lazy val` for `BLOCK_SIZE` meant that the block
size was always determined by the first SparkConf that TorrentBroadast was
initialized with; as a result, unit tests could not properly test
TorrentBroadcast with different block sizes.
Instead, blockifyObject and unBlockifyObject now accept compression codecs
and blockSizes as arguments. These arguments are supplied at the call
sites
inside of TorrentBroadcast instances. Each TorrentBroadcast instance
determines these values from SparkEnv's SparkConf. I was careful to
ensure
that we do not accidentally serialize CompressionCodec or SparkConf
objects
as part of the TorrentBroadcast object.
- Remove special-case handling of local-mode in TorrentBroadcast. I don't
think that broadcast implementations should know about whether we're
running
in local mode. If we want to optimize the performance of broadcast in
local
mode, then we should detect this at a higher level and use a dummy
LocalBroadcastFactory implementation instead.
Removing this code fixes a subtle error condition: in the old local mode
code, a failure to find the broadcast in the local BlockManager would lead
to an attempt to deblockify zero blocks, which could lead to confusing
deserialization or decompression errors when we attempted to decompress
an empty byte array. This should never have happened, though: a failure
to
find the block in local mode is evidence of some other error. The changes
here will make it easier to debug those errors if they ever happen.
- Add a check that throws an exception when attempting to deblockify an
empty array.
- Use ScalaCheck to add a test to check that TorrentBroadcast's
blockifyObject and unBlockifyObject methods are inverses.
- Misc. cleanup and logging improvements.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]