Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r195588878
--- Diff:
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
---
@@ -203,6 +197,76 @@ public void onFailure(Throwable e) {
}
}
+ /**
+ * Handle a request from the client to upload a stream of data.
+ */
+ private void processStreamUpload(final UploadStream req) {
+ assert (req.body() == null);
+ try {
+ RpcResponseCallback callback = new RpcResponseCallback() {
+ @Override
+ public void onSuccess(ByteBuffer response) {
+ respond(new RpcResponse(req.requestId, new
NioManagedBuffer(response)));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ respond(new RpcFailure(req.requestId,
Throwables.getStackTraceAsString(e)));
+ }
+ };
+ TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
+ channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
+ ByteBuffer meta = req.meta.nioByteBuffer();
+ StreamCallbackWithID streamHandler =
rpcHandler.receiveStream(reverseClient, meta, callback);
+ StreamCallbackWithID wrappedCallback = new StreamCallbackWithID() {
+ @Override
+ public void onData(String streamId, ByteBuffer buf) throws
IOException {
+ streamHandler.onData(streamId, buf);
+ }
+
+ @Override
+ public void onComplete(String streamId) throws IOException {
+ try {
+ streamHandler.onComplete(streamId);
+ callback.onSuccess(ByteBuffer.allocate(0));
+ } catch (Exception ex) {
+ IOException ioExc = new IOException("Failure post-processing
complete stream;" +
+ " failing this rpc and leaving channel active");
+ callback.onFailure(ioExc);
+ streamHandler.onFailure(streamId, ioExc);
+ }
+ }
+
+ @Override
+ public void onFailure(String streamId, Throwable cause) throws
IOException {
+ callback.onFailure(new IOException("Destination failed while
reading stream", cause));
+ streamHandler.onFailure(streamId, cause);
+ }
+
+ @Override
+ public String getID() {
+ return streamHandler.getID();
+ }
+ };
+ if (req.bodyByteCount > 0) {
+ StreamInterceptor interceptor = new StreamInterceptor(this,
wrappedCallback.getID(),
+ req.bodyByteCount, wrappedCallback);
+ frameDecoder.setInterceptor(interceptor);
+ } else {
+ wrappedCallback.onComplete(wrappedCallback.getID());
+ }
+ } catch (Exception e) {
+ logger.error("Error while invoking RpcHandler#receive() on RPC id "
+ req.requestId, e);
+ respond(new RpcFailure(req.requestId,
Throwables.getStackTraceAsString(e)));
+ // We choose to totally fail the channel, rather than trying to
recover as we do in other
+ // cases. We don't know how many bytes of the stream the client has
already sent for the
+ // stream, its not worth trying to recover.
--- End diff --
it's
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]