GitHub user witgo opened a pull request:
https://github.com/apache/spark/pull/14995
[Test Only][not ready for review][SPARK-6235][CORE]Address various 2G
limits
## What changes were proposed in this pull request?
### motivation
The various 2G limit in Spark.
1. When reading the data block is stored in the hard disk, the following
code fragment is called.
```scala
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val stream = maybeCacheDiskBytesInMemory(info, blockId, level,
diskBytes)
.map {_.toInputStream(dispose = false)}
.getOrElse { diskBytes.toInputStream(dispose = true) }
serializerManager.dataDeserializeStream(blockId,
stream)(info.classTag)
}
}
```
```scala
def getBytes(blockId: BlockId): ChunkedByteBuffer = {
val file = diskManager.getFile(blockId.name)
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (file.length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(file.length.toInt)
channel.position(0)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
new ChunkedByteBuffer(buf)
} else {
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0,
file.length))
}
} {
channel.close()
}
}
```
The above code has the following problems:
* `channel.map(MapMode.READ_ONLY, 0, file.length)` returns an instance
of `MappedByteBuffer`. the size of `MappedByteBuffer` can not exceed 2G.
* When a `Iterator[Any]` is generated, need to load all the data into
the memory,this may take up a lot of memory.
2. When using kryo serialized data, the following code fragment is called:
```scala
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
val kryo = borrowKryo()
try {
kryo.writeClassAndObject(output, t)
} catch {
case e: KryoException if e.getMessage.startsWith("Buffer overflow")
=>
throw new SparkException(s"Kryo serialization failed:
${e.getMessage}. To avoid this, " +
"increase spark.kryoserializer.buffer.max value.")
} finally {
releaseKryo(kryo)
}
ByteBuffer.wrap(output.toBytes)
}
```
The above code has the following problems:
* The serialization data is stored in the `output` internal `byte[]`,
the size of `byte[]` can not exceed 2G.
3. When RPC writes data to be sent to the Channel, the following code
fragment is called:
```scala
public long transferTo(final WritableByteChannel target, final long
position) throws IOException {
Preconditions.checkArgument(position == totalBytesTransferred,
"Invalid position.");
// Bytes written for header in this call.
long writtenHeader = 0;
if (header.readableBytes() > 0) {
writtenHeader = copyByteBuf(header, target);
totalBytesTransferred += writtenHeader;
if (header.readableBytes() > 0) {
return writtenHeader;
}
}
// Bytes written for body in this call.
long writtenBody = 0;
if (body instanceof FileRegion) {
writtenBody = ((FileRegion) body).transferTo(target,
totalBytesTransferred - headerLength);
} else if (body instanceof ByteBuf) {
writtenBody = copyByteBuf((ByteBuf) body, target);
}
totalBytesTransferred += writtenBody;
return writtenHeader + writtenBody;
}
```
The above code has the following problems:
* the size of ByteBuf cannot exceed 2G
* cannot transfer data over 2G in memory
4. When decodes the RPC message received, the following code fragment is
called:
```scala
public final class MessageDecoder extends
MessageToMessageDecoder<ByteBuf> {
private static final Logger logger =
LoggerFactory.getLogger(MessageDecoder.class);
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) {
Message.Type msgType = Message.Type.decode(in);
Message decoded = decode(msgType, in);
assert decoded.type() == msgType;
logger.trace("Received message {}: {}", msgType, decoded);
out.add(decoded);
}
private Message decode(Message.Type msgType, ByteBuf in) {
switch (msgType) {
case ChunkFetchRequest:
return ChunkFetchRequest.decode(in);
case ChunkFetchSuccess:
return ChunkFetchSuccess.decode(in);
case ChunkFetchFailure:
return ChunkFetchFailure.decode(in);
default:
throw new IllegalArgumentException("Unexpected message type: " +
msgType);
}
}
}
```
The above code has the following problems:
* the size of ByteBuf cannot exceed 2G
* Must be in the receiver to complete the data can be decoded
### Goals
* Setup for eliminating the various 2G limit in Spark. (The 2G limit
1,2,3,4)
* Support back-pressure flow control for remote data reading(experimental
goal). (The 2G limit 4)
* Add buffer pool(long-range goal).
### Design
#### Setup for eliminating the various 2G limit in Spark.
##### Replace ByteBuffer with ChunkedByteBuffer. (The 2G limit 1,2)
* Support reference counting, a necessary condition to the feature of the
buffer pool
[Reference counted
objects](http://netty.io/wiki/reference-counted-objects.html)
* Support serialization for easy transport
* Support slice duplicate and copy operation
* Can be efficiently converted to `InputStream`, `ByteBuffer`, `byte[]`
and `ByteBuf`, etc.
1. Move the ChunkedByteBuffer class to
`common/network-common/src/main/java/org/apache/spark/network/buffer/`.
2. Modify `ManagedBuffer.nioByteBuffer`'s return value to ChunkedByteBuffer
instance.(The 2G limit 1)
3. Modify the parameter of `SerializerInstance.deserialize` and the return
value of `SerializerInstance.serialize` to ChunkedByteBuffer instance. (The 2G
limit 2)
```scala
def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
output.clear()
val out = ChunkedByteBufferOutputStream.newInstance()
output.setOutputStream(out)
val kryo = borrowKryo()
kryo.writeClassAndObject(output, t)
output.close()
out.toChunkedByteBuffer
}
```
4. Other changes.
##### Replace ByteBuf with InputStream.
1. Modify `NioManagedBuffer.convertToNetty` method returns InputStream
instances when data is larger than Integer.MAX_VALUE. (The 2G limit 3)
2. Add InputStreamManagedBuffer class, used to convert InputStream instance
to ManagedBuffer instance. (The 2G limit 4)
3. Modify MessageWithHeader classes, support processing InputStream
instance (The 2G limit 3)
4. Modify the parameters of the `Encodable.encode` method to OutputStream
instance. (The 2G limit 4)
5. Modify the parameters of the decode method of the classes who implement
the Encodable interface to InputStream instance.It can handle mixed storage
data (The 2G limit 3)
```scala
public InputStream toInputStream() throws IOException {
ChunkedByteBufferOutputStream out =
ChunkedByteBufferOutputStream.newInstance();
Encoders.Bytes.encode(out, type().id());
encodeWithoutBlockData(out);
// out.toChunkedByteBuffer().toInputStream() data in memory
// blockData.createInputStream() data in hard disk(FileInputStream)
return new
SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
blockData.createInputStream());
}
```
6. Modify TransportFrameDecoder class, use `LinkedList<ByteBuf>` to
represent the Frame, remove the size limit of Frame. The 2G limit 4)
7. Add ByteBufInputStream class, used to convert `LinkedList<ByteBuf>`
instance to InputStream instance. (The 2G limit 4)
8. Modify the parameters of `RpcHandler.receive` method to InputStream
instance. (The 2G limit 4)
## How was this patch tested?
TODO
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/witgo/spark
SPARK-6235_Address_various_2G_limits
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14995.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14995
----
commit 60aa562901900b81e03f0dc501d3c555d690e42a
Author: Guoqiang Li <[email protected]>
Date: 2016-09-06T13:35:00Z
Address various 2G limits
commit 8a80539846a10b0e4784e9f269eb3ce370fcded4
Author: Guoqiang Li <[email protected]>
Date: 2016-09-07T11:31:27Z
review commits
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]