Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r190997078
--- Diff:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
---
@@ -244,6 +242,54 @@ public long sendRpc(ByteBuffer message,
RpcResponseCallback callback) {
return requestId;
}
+ /**
+ * Send data to the remote end as a stream. This differs from stream()
in that this is a request
+ * to *send* data to the remote end, not to receive it from the remote.
+ *
+ * @param meta meta data associated with the stream, which will be read
completely on the
+ * receiving end before the stream itself.
+ * @param data this will be streamed to the remote end to allow for
transferring large amounts
+ * of data without reading into memory.
+ * @param callback handles the reply -- onSuccess will only be called
when both message and data
+ * are received successfully.
+ */
+ public long uploadStream(
+ ManagedBuffer meta,
+ ManagedBuffer data,
+ RpcResponseCallback callback) {
+ long startTime = System.currentTimeMillis();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+ }
+
+ long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ handler.addRpcRequest(requestId, callback);
+
+ channel.writeAndFlush(new UploadStream(requestId, meta, data))
+ .addListener(future -> {
+ if (future.isSuccess()) {
--- End diff --
First reaction is that it's about the right time to refactor this into a
helper method... all instances in this class look quite similar.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]