GitHub user witgo opened a pull request:

    https://github.com/apache/spark/pull/14662

    [WIP][SPARK-17082][CORE]Replace ByteBuffer with ChunkedByteBuffer

    ## What changes were proposed in this pull request?
    
     Replace `ByteBuffer` with `ChunkedByteBuffer` :
    
    * ManagedBuffer
        `public abstract ByteBuffer nioByteBuffer() throws IOException;` => 
`public abstract ChunkedByteBuffer nioByteBuffer() throws IOException;`
    * FileSegmentManagedBuffer
        add constructor `public FileSegmentManagedBuffer(long memoryMapBytes, 
boolean lazyFileDescriptor, File file, long offset, long length)`  
    
    * NettyManagedBuffer 
        Support Zero-copy in `nioByteBuffer` method
    
    * NioManagedBuffer  
        add constructor `public NioManagedBuffer(ChunkedByteBuffer buf)`
    
    * RpcResponseCallback
        `void onSuccess(ByteBuffer response)` => `void 
onSuccess(ChunkedByteBuffer response)`
    
    * TransportClient
        `public long sendRpc(ByteBuffer message, final RpcResponseCallback 
callback)` => `public long sendRpc(ChunkedByteBuffer message, final 
RpcResponseCallback callback)`
        `public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs)` => 
`ChunkedByteBuffer sendRpcSync(ChunkedByteBuffer message, long timeoutMs)`      
 
        `public void send(ByteBuffer message)` => `public void 
send(ChunkedByteBuffer message)`
    
    * SaslRpcHandler
        `public void receive(TransportClient client, ByteBuffer message, 
RpcResponseCallback callback)` => `public void receive(TransportClient client, 
ChunkedByteBuffer message, RpcResponseCallback callback)`
        `public void receive(TransportClient client, ByteBuffer message)` => 
`public void receive(TransportClient client, ChunkedByteBuffer message)`
    * NoOpRpcHandler
        ` public void receive(TransportClient client, ByteBuffer message, 
RpcResponseCallback callback)` => `public void receive(TransportClient client, 
ChunkedByteBuffer message, RpcResponseCallback callback)`      
    
    * RpcHandler
        `public abstract void receive(TransportClient client,  ByteBuffer 
message, RpcResponseCallback callback)` =>  `public abstract void 
receive(TransportClient client,  ChunkedByteBuffer message, RpcResponseCallback 
callback)`
        `public void receive(TransportClient client, ByteBuffer message)` => 
`public void receive(TransportClient client, ChunkedByteBuffer message)` 
        `public void onSuccess(ByteBuffer response)` => `public void 
onSuccess(ChunkedByteBuffer response)`
    
    * org.apache.spark.network.shuffle.protocol.Decoder
        `BlockTransferMessage fromByteBuffer(ByteBuffer msg)` => `public static 
BlockTransferMessage fromByteBuffer(ChunkedByteBuffer msg)`     
    
    * TorrentBroadcast
        `def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer], 
serializer: Serializer, compressionCodec: Option[CompressionCodec])` =>
        `def unBlockifyObject[T: ClassTag](blocks: Array[ChunkedByteBuffer], 
serializer: Serializer, compressionCodec: Option[CompressionCodec])`
    
    * Executor
        `def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: 
Int, taskName: String, serializedTask: ByteBuffer): Unit` =>
        `def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber: 
Int, taskName: String, serializedTask: ChunkedByteBuffer): Unit`
    
    * ExecutorBackend
        `def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): 
Unit` => `def statusUpdate(taskId: Long, state: TaskState, data: 
ChunkedByteBuffer): Unit` 
    
    * OneWayOutboxMessage
        `case class OneWayOutboxMessage(content: ByteBuffer) extends 
OutboxMessage` => `case class OneWayOutboxMessage(content: ChunkedByteBuffer) 
extends OutboxMessage`
    
    * org.apache.spark.scheduler.Task   
        `serializeWithDependencies(task: Task[_], currentFiles: 
mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: 
SerializerInstance): ByteBuffer` =>
        `serializeWithDependencies(task: Task[_], currentFiles: 
mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer: 
SerializerInstance): ChunkedByteBuffer`
        `deserializeWithDependencies(serializedTask: ByteBuffer): 
(HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer)` =>
        `deserializeWithDependencies(serializedTask: ChunkedByteBuffer): 
(HashMap[String, Long], HashMap[String, Long], Properties, ChunkedByteBuffer)`
    
    * TaskDescription
        `private[spark] class TaskDescription(val taskId: Long, val 
attemptNumber: Int, val executorId: String, val name: String, val index: Int, 
val serializedTask: ChunkedByteBuffer)` => `private[spark] class 
TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId: 
String, val name: String, val index: Int, _serializedTask: ByteBuffer)`
    
    * DirectTaskResult
        `private[spark] class DirectTaskResult[T](var valueBytes: ByteBuffer, 
var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]`=>
        `private[spark] class DirectTaskResult[T](var valueBytes: 
ChunkedByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends 
TaskResult[T]`
    
    * TaskResultGetter
        `def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, 
serializedData: ByteBuffer): Unit` =>`def enqueueSuccessfulTask(taskSetManager: 
TaskSetManager, tid: Long, serializedData: ChunkedByteBuffer): Unit`
        `def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, 
taskState: TaskState, serializedData: ByteBuffer)` `def 
enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
TaskState, serializedData: ChunkedByteBuffer)`
    
    * TaskSchedulerImpl
        `def statusUpdate(tid: Long, state: TaskState, serializedData: 
ByteBuffer)` =>  `def statusUpdate(tid: Long, state: TaskState, serializedData: 
ChunkedByteBuffer)`
    
    * CoarseGrainedClusterMessages.LaunchTask
        `case class LaunchTask(data: SerializableBuffer) extends 
CoarseGrainedClusterMessage` => `case class LaunchTask(data: ChunkedByteBuffer) 
extends CoarseGrainedClusterMessage`
    
    * CoarseGrainedClusterMessages.StatusUpdate
        `case class StatusUpdate(executorId: String, taskId: Long, state: 
TaskState, data: SerializableBuffer) extends CoarseGrainedClusterMessage` => 
`case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, 
data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage`
    * org.apache.spark.scheduler.local.LocalSchedulerBackend    
        `private case class StatusUpdate(taskId: Long, state: TaskState, 
serializedData: ByteBuffer)` => `private case class StatusUpdate(taskId: Long, 
state: TaskState, serializedData: ChunkedByteBuffer)`
    
    * SerializerInstance
        `def serialize[T: ClassTag](t: T): ByteBuffer' => `def serialize[T: 
ClassTag](t: T): ChunkedByteBuffer`
        `def deserialize[T: ClassTag](bytes: ByteBuffer): T` => def 
deserialize[T: ClassTag](bytes: ChunkedByteBuffer): T
        `def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): 
T` => `def deserialize[T: ClassTag](bytes: ChunkedByteBuffer, loader: 
ClassLoader): T`
        `       
    
    
    ## How was this patch tested?
    
    TODO: ....

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/witgo/spark 
SPARK-17082_ByteBuffer_2_ChunkedByteBuffer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14662
    
----
commit b6c1e3aeb3643745ebf3840cd210fbd1011bcf08
Author: Guoqiang Li <[email protected]>
Date:   2016-08-16T08:27:39Z

    Replace ByteBuffer with ChunkedByteBuffer

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to