GitHub user witgo opened a pull request:
https://github.com/apache/spark/pull/14662
[WIP][SPARK-17082][CORE]Replace ByteBuffer with ChunkedByteBuffer
## What changes were proposed in this pull request?
Replace `ByteBuffer` with `ChunkedByteBuffer` :
* ManagedBuffer
`public abstract ByteBuffer nioByteBuffer() throws IOException;` =>
`public abstract ChunkedByteBuffer nioByteBuffer() throws IOException;`
* FileSegmentManagedBuffer
add constructor `public FileSegmentManagedBuffer(long memoryMapBytes,
boolean lazyFileDescriptor, File file, long offset, long length)`
* NettyManagedBuffer
Support Zero-copy in `nioByteBuffer` method
* NioManagedBuffer
add constructor `public NioManagedBuffer(ChunkedByteBuffer buf)`
* RpcResponseCallback
`void onSuccess(ByteBuffer response)` => `void
onSuccess(ChunkedByteBuffer response)`
* TransportClient
`public long sendRpc(ByteBuffer message, final RpcResponseCallback
callback)` => `public long sendRpc(ChunkedByteBuffer message, final
RpcResponseCallback callback)`
`public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs)` =>
`ChunkedByteBuffer sendRpcSync(ChunkedByteBuffer message, long timeoutMs)`
`public void send(ByteBuffer message)` => `public void
send(ChunkedByteBuffer message)`
* SaslRpcHandler
`public void receive(TransportClient client, ByteBuffer message,
RpcResponseCallback callback)` => `public void receive(TransportClient client,
ChunkedByteBuffer message, RpcResponseCallback callback)`
`public void receive(TransportClient client, ByteBuffer message)` =>
`public void receive(TransportClient client, ChunkedByteBuffer message)`
* NoOpRpcHandler
` public void receive(TransportClient client, ByteBuffer message,
RpcResponseCallback callback)` => `public void receive(TransportClient client,
ChunkedByteBuffer message, RpcResponseCallback callback)`
* RpcHandler
`public abstract void receive(TransportClient client, ByteBuffer
message, RpcResponseCallback callback)` => `public abstract void
receive(TransportClient client, ChunkedByteBuffer message, RpcResponseCallback
callback)`
`public void receive(TransportClient client, ByteBuffer message)` =>
`public void receive(TransportClient client, ChunkedByteBuffer message)`
`public void onSuccess(ByteBuffer response)` => `public void
onSuccess(ChunkedByteBuffer response)`
* org.apache.spark.network.shuffle.protocol.Decoder
`BlockTransferMessage fromByteBuffer(ByteBuffer msg)` => `public static
BlockTransferMessage fromByteBuffer(ChunkedByteBuffer msg)`
* TorrentBroadcast
`def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer],
serializer: Serializer, compressionCodec: Option[CompressionCodec])` =>
`def unBlockifyObject[T: ClassTag](blocks: Array[ChunkedByteBuffer],
serializer: Serializer, compressionCodec: Option[CompressionCodec])`
* Executor
`def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber:
Int, taskName: String, serializedTask: ByteBuffer): Unit` =>
`def launchTask(context: ExecutorBackend, taskId: Long, attemptNumber:
Int, taskName: String, serializedTask: ChunkedByteBuffer): Unit`
* ExecutorBackend
`def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer):
Unit` => `def statusUpdate(taskId: Long, state: TaskState, data:
ChunkedByteBuffer): Unit`
* OneWayOutboxMessage
`case class OneWayOutboxMessage(content: ByteBuffer) extends
OutboxMessage` => `case class OneWayOutboxMessage(content: ChunkedByteBuffer)
extends OutboxMessage`
* org.apache.spark.scheduler.Task
`serializeWithDependencies(task: Task[_], currentFiles:
mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer:
SerializerInstance): ByteBuffer` =>
`serializeWithDependencies(task: Task[_], currentFiles:
mutable.Map[String, Long], currentJars: mutable.Map[String, Long], serializer:
SerializerInstance): ChunkedByteBuffer`
`deserializeWithDependencies(serializedTask: ByteBuffer):
(HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer)` =>
`deserializeWithDependencies(serializedTask: ChunkedByteBuffer):
(HashMap[String, Long], HashMap[String, Long], Properties, ChunkedByteBuffer)`
* TaskDescription
`private[spark] class TaskDescription(val taskId: Long, val
attemptNumber: Int, val executorId: String, val name: String, val index: Int,
val serializedTask: ChunkedByteBuffer)` => `private[spark] class
TaskDescription(val taskId: Long, val attemptNumber: Int, val executorId:
String, val name: String, val index: Int, _serializedTask: ByteBuffer)`
* DirectTaskResult
`private[spark] class DirectTaskResult[T](var valueBytes: ByteBuffer,
var accumUpdates: Seq[AccumulatorV2[_, _]]) extends TaskResult[T]`=>
`private[spark] class DirectTaskResult[T](var valueBytes:
ChunkedByteBuffer, var accumUpdates: Seq[AccumulatorV2[_, _]]) extends
TaskResult[T]`
* TaskResultGetter
`def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long,
serializedData: ByteBuffer): Unit` =>`def enqueueSuccessfulTask(taskSetManager:
TaskSetManager, tid: Long, serializedData: ChunkedByteBuffer): Unit`
`def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long,
taskState: TaskState, serializedData: ByteBuffer)` `def
enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState:
TaskState, serializedData: ChunkedByteBuffer)`
* TaskSchedulerImpl
`def statusUpdate(tid: Long, state: TaskState, serializedData:
ByteBuffer)` => `def statusUpdate(tid: Long, state: TaskState, serializedData:
ChunkedByteBuffer)`
* CoarseGrainedClusterMessages.LaunchTask
`case class LaunchTask(data: SerializableBuffer) extends
CoarseGrainedClusterMessage` => `case class LaunchTask(data: ChunkedByteBuffer)
extends CoarseGrainedClusterMessage`
* CoarseGrainedClusterMessages.StatusUpdate
`case class StatusUpdate(executorId: String, taskId: Long, state:
TaskState, data: SerializableBuffer) extends CoarseGrainedClusterMessage` =>
`case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
data: ChunkedByteBuffer) extends CoarseGrainedClusterMessage`
* org.apache.spark.scheduler.local.LocalSchedulerBackend
`private case class StatusUpdate(taskId: Long, state: TaskState,
serializedData: ByteBuffer)` => `private case class StatusUpdate(taskId: Long,
state: TaskState, serializedData: ChunkedByteBuffer)`
* SerializerInstance
`def serialize[T: ClassTag](t: T): ByteBuffer' => `def serialize[T:
ClassTag](t: T): ChunkedByteBuffer`
`def deserialize[T: ClassTag](bytes: ByteBuffer): T` => def
deserialize[T: ClassTag](bytes: ChunkedByteBuffer): T
`def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader):
T` => `def deserialize[T: ClassTag](bytes: ChunkedByteBuffer, loader:
ClassLoader): T`
`
## How was this patch tested?
TODO: ....
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/witgo/spark
SPARK-17082_ByteBuffer_2_ChunkedByteBuffer
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14662.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14662
----
commit b6c1e3aeb3643745ebf3840cd210fbd1011bcf08
Author: Guoqiang Li <[email protected]>
Date: 2016-08-16T08:27:39Z
Replace ByteBuffer with ChunkedByteBuffer
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]