Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r195464525
--- Diff:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
---
@@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message,
RpcResponseCallback callback) {
handler.addRpcRequest(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new
NioManagedBuffer(message)))
- .addListener(future -> {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms",
requestId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send RPC %s to %s:
%s", requestId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeRpcRequest(requestId);
- channel.close();
- try {
- callback.onFailure(new IOException(errorMsg,
future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback
handler!", e);
- }
- }
- });
+ .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+ return requestId;
+ }
+
+ /**
+ * Send data to the remote end as a stream. This differs from stream()
in that this is a request
+ * to *send* data to the remote end, not to receive it from the remote.
+ *
+ * @param meta meta data associated with the stream, which will be read
completely on the
+ * receiving end before the stream itself.
+ * @param data this will be streamed to the remote end to allow for
transferring large amounts
+ * of data without reading into memory.
+ * @param callback handles the reply -- onSuccess will only be called
when both message and data
+ * are received successfully.
+ */
+ public long uploadStream(
+ ManagedBuffer meta,
+ ManagedBuffer data,
+ RpcResponseCallback callback) {
+ long startTime = System.currentTimeMillis();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+ }
+
+ long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]