GitHub user witgo opened a pull request:

    https://github.com/apache/spark/pull/14995

    [Test Only][not ready for review][SPARK-6235][CORE]Address various 2G 
limits 

    ## What changes were proposed in this pull request?
    
    ### motivation
    
    The various 2G limit in Spark. 
    
    1. When reading the data block is stored in the hard disk, the following 
code fragment is called. 
    
      ```scala
        val iterToReturn: Iterator[Any] = {
          val diskBytes = diskStore.getBytes(blockId)
          if (level.deserialized) {
            val diskValues = serializerManager.dataDeserializeStream(
              blockId,
              diskBytes.toInputStream(dispose = true))(info.classTag)
            maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
          } else {
            val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskBytes)
              .map {_.toInputStream(dispose = false)}
              .getOrElse { diskBytes.toInputStream(dispose = true) }
            serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
          }
        }
    
      ```
    
      ```scala
        def getBytes(blockId: BlockId): ChunkedByteBuffer = {
          val file = diskManager.getFile(blockId.name)
          val channel = new RandomAccessFile(file, "r").getChannel
          Utils.tryWithSafeFinally {
            // For small files, directly read rather than memory map
            if (file.length < minMemoryMapBytes) {
              val buf = ByteBuffer.allocate(file.length.toInt)
              channel.position(0)
              while (buf.remaining() != 0) {
                if (channel.read(buf) == -1) {
                  throw new IOException("Reached EOF before filling buffer\n" +
                    
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
                }
              }
              buf.flip()
              new ChunkedByteBuffer(buf)
            } else {
              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
            }
          } {
            channel.close()
          }
        }
    
      ```
    
      The above code has the following problems:  
        * `channel.map(MapMode.READ_ONLY, 0, file.length)` returns an instance 
of `MappedByteBuffer`. the size of `MappedByteBuffer` can not exceed 2G. 
        * When a `Iterator[Any]` is generated, need to load all the data into 
the memory,this may take up a lot of memory.
    
    
    2. When using kryo serialized data, the following code fragment is called: 
    
      ```scala
    
        override def serialize[T: ClassTag](t: T): ByteBuffer = {
          output.clear()
          val kryo = borrowKryo()
          try {
            kryo.writeClassAndObject(output, t)
          } catch {
            case e: KryoException if e.getMessage.startsWith("Buffer overflow") 
=>
              throw new SparkException(s"Kryo serialization failed: 
${e.getMessage}. To avoid this, " +
                "increase spark.kryoserializer.buffer.max value.")
          } finally {
            releaseKryo(kryo)
          }
          ByteBuffer.wrap(output.toBytes)
        }
    
      ```
    
      The above code has the following problems:
        * The serialization data is stored in the `output` internal `byte[]`, 
the size of `byte[]` can not exceed 2G. 
    3. When RPC writes data to be sent to the Channel, the following code 
fragment is called: 
      ```scala
        public long transferTo(final WritableByteChannel target, final long 
position) throws IOException {
          Preconditions.checkArgument(position == totalBytesTransferred, 
"Invalid position.");
          // Bytes written for header in this call.
          long writtenHeader = 0;
          if (header.readableBytes() > 0) {
            writtenHeader = copyByteBuf(header, target);
            totalBytesTransferred += writtenHeader;
            if (header.readableBytes() > 0) {
              return writtenHeader;
            }
          }
    
          // Bytes written for body in this call.
          long writtenBody = 0;
          if (body instanceof FileRegion) {
            writtenBody = ((FileRegion) body).transferTo(target, 
totalBytesTransferred - headerLength);
          } else if (body instanceof ByteBuf) {
            writtenBody = copyByteBuf((ByteBuf) body, target);
          }
          totalBytesTransferred += writtenBody;
          return writtenHeader + writtenBody;
        }
      ```
    
      The above code has the following problems: 
        * the size of ByteBuf cannot exceed 2G 
        * cannot transfer data over 2G in memory
    
        
    4. When decodes the RPC message received, the following code fragment is 
called: 
    
      ```scala
      public final class MessageDecoder extends 
MessageToMessageDecoder<ByteBuf> {
    
        private static final Logger logger = 
LoggerFactory.getLogger(MessageDecoder.class);
    
        @Override
        public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) {
          Message.Type msgType = Message.Type.decode(in);
          Message decoded = decode(msgType, in);
          assert decoded.type() == msgType;
          logger.trace("Received message {}: {}", msgType, decoded);
          out.add(decoded);
        }
    
        private Message decode(Message.Type msgType, ByteBuf in) {
          switch (msgType) {
            case ChunkFetchRequest:
              return ChunkFetchRequest.decode(in);
    
            case ChunkFetchSuccess:
              return ChunkFetchSuccess.decode(in);
    
            case ChunkFetchFailure:
              return ChunkFetchFailure.decode(in);
    
            default:
              throw new IllegalArgumentException("Unexpected message type: " + 
msgType);
          }
        }
      }
    
      ```
    
      The above code has the following problems: 
        * the size of ByteBuf cannot exceed 2G 
        * Must be in the receiver to complete the data can be decoded
    
    ### Goals
      
    * Setup for eliminating the various 2G limit in Spark.   (The 2G limit 
1,2,3,4)
    * Support back-pressure flow control for remote data reading(experimental 
goal).  (The 2G limit 4)
    * Add buffer pool(long-range goal). 
    
    ### Design
    
    #### Setup for eliminating the various 2G limit in Spark.  
    
    ##### Replace ByteBuffer with ChunkedByteBuffer. (The 2G limit 1,2)
      * Support reference counting, a necessary condition to the feature of the 
buffer pool 
          [Reference counted 
objects](http://netty.io/wiki/reference-counted-objects.html)
      * Support serialization for easy transport 
      * Support slice duplicate and copy operation 
      * Can be efficiently converted to `InputStream`, `ByteBuffer`, `byte[]` 
and `ByteBuf`, etc. 
    
    1. Move the ChunkedByteBuffer class to 
`common/network-common/src/main/java/org/apache/spark/network/buffer/`. 
    2. Modify `ManagedBuffer.nioByteBuffer`'s return value to ChunkedByteBuffer 
instance.(The 2G limit 1)
    3. Modify the parameter of `SerializerInstance.deserialize` and the return 
value of `SerializerInstance.serialize` to ChunkedByteBuffer instance. (The 2G 
limit 2)
    
      ```scala
      def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
        output.clear()
        val out = ChunkedByteBufferOutputStream.newInstance()
        output.setOutputStream(out)
        val kryo = borrowKryo()
        kryo.writeClassAndObject(output, t)
        output.close()
        out.toChunkedByteBuffer
      }
      ```
    
    4. Other changes. 
    
    ##### Replace ByteBuf with InputStream. 
    
    1. Modify `NioManagedBuffer.convertToNetty` method returns InputStream 
instances when data is larger than Integer.MAX_VALUE. (The 2G limit 3)
    2. Add InputStreamManagedBuffer class, used to convert InputStream instance 
to ManagedBuffer instance. (The 2G limit 4)
    3. Modify MessageWithHeader classes, support processing InputStream 
instance (The 2G limit 3)
    4. Modify the parameters of the `Encodable.encode` method to OutputStream 
instance. (The 2G limit 4)
    5. Modify the parameters of the decode method of the classes who implement 
the Encodable interface to InputStream instance.It can handle mixed storage 
data  (The 2G limit 3) 
      
      ```scala
      public InputStream toInputStream() throws IOException {
        ChunkedByteBufferOutputStream out = 
ChunkedByteBufferOutputStream.newInstance();
        Encoders.Bytes.encode(out, type().id());
        encodeWithoutBlockData(out);
        // out.toChunkedByteBuffer().toInputStream() data in memory
        // blockData.createInputStream()  data in hard disk(FileInputStream)
        return new 
SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
            blockData.createInputStream());
      }
      ```
    
    6. Modify TransportFrameDecoder class, use `LinkedList<ByteBuf>` to 
represent the Frame, remove the size limit of Frame.  The 2G limit 4) 
    7. Add ByteBufInputStream class, used to convert `LinkedList<ByteBuf>` 
instance to InputStream instance.  (The 2G limit 4) 
    8. Modify the parameters of `RpcHandler.receive` method to InputStream 
instance.  (The 2G limit 4) 
    
    
    ## How was this patch tested?
    
    TODO
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/witgo/spark 
SPARK-6235_Address_various_2G_limits

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14995
    
----
commit 60aa562901900b81e03f0dc501d3c555d690e42a
Author: Guoqiang Li <[email protected]>
Date:   2016-09-06T13:35:00Z

    Address various 2G limits

commit 8a80539846a10b0e4784e9f269eb3ce370fcded4
Author: Guoqiang Li <[email protected]>
Date:   2016-09-07T11:31:27Z

    review commits

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to