Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r195584794
--- Diff:
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
---
@@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message,
RpcResponseCallback callback) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
- long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ long requestId = requestId();
handler.addRpcRequest(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new
NioManagedBuffer(message)))
- .addListener(future -> {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms",
requestId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send RPC %s to %s:
%s", requestId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeRpcRequest(requestId);
- channel.close();
- try {
- callback.onFailure(new IOException(errorMsg,
future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback
handler!", e);
- }
- }
- });
+ .addListener(new RpcChannelListener(startTime, requestId, callback));
return requestId;
}
+ /**
+ * Send data to the remote end as a stream. This differs from stream()
in that this is a request
--- End diff --
I know you're in the "2 spaces after period camp", but that's 3.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]