Hao Hao has posted comments on this change. ( http://gerrit.cloudera.org:8080/14425 )
Change subject: KUDU-2971 p1: add subprocess module ...................................................................... Patch Set 4: (8 comments) > Patch Set 2: > > > Patch Set 2: Code-Review+1 > > > > (1 comment) > > > > I think this refactoring makes sense. I'm also not opposed to retaining the > > JSON support, since it's still being used by "kudu test mini_cluster". > > > > I haven't looked at the other patches yet, but I do wonder how we're going > > to multiplex on top of the subprocess protocol. Thinking out loud: > > > > Naively, we could put a lock around the entire subprocess protocol and > > force users to effectively do: > > 1. Lock > > 2. Send my message > > 3. Wait for and receive response. > > 4. Unlock > > > > But that'll perform poorly as there could only be one outstanding request > > this way. > > > > A better approach would be to allow multiple producers. On the send side, > > maybe that's done by locking SendMessage and forcing threads to call > > SendMessage directly. Or maybe it's done via threads posting messages to a > > blocking queue and a dedicated thread waiting on that queue and calling > > SendMessage. Would be interesting to compare the two. > > > > With multiple producers, the receive side will probably need a dedicated > > thread blocked on ReceiveMessage. Whenever it gets a new message, it has to > > somehow match it up with a sender. Using a sequence ID field in the message > > makes sense, but that muddies the abstraction in that now some aspect of > > the protobuf definition includes information that's necessary for the > > transport protocol (rather than the "application"). > > > > Anyway, the refactor looks good; the rest of these issues will shake out > > separately. > > Yeah, I chatted briefly about this with Hao on Slack and alluded to something > similar in my comments on the Java patch. I think for concurrent calls, we'd > want an ID field like what we have in the RPC headers. In https://gerrit.cloudera.org/c/14625/, I implemented something similar as Adar commented. ‘The sender can post request messages to a blocking queue and a dedicated thread waiting on the queue and send the message to the underlying subprocess once getting one. The receiver has another dedicated thread blocks on receiving messages from the subprocess, and notifies the sender the response with the matching message ID.’. I chose to use blocking queue as the sender doesn’t need to block on the lock of SendMessage. http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG@8 PS2, Line 8: : Utility classes exist that allow for IPC over stdin/stdout via protobuf : and JSON-encoded protobuf. This commit moves those classes into their : own directory so it can be reused by other subprocesses. : > nit: this sentence is a bit long to follow. Maybe: Done http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG@13 PS2, Line 13: Following commits can then extend it to support concurrent communications : with subprocess. There are no functional changes in this patch. : > nit: maybe mention that this patch has no functional changes. Done http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.h File src/kudu/subprocess/subprocess_protocol.h: http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.h@40 PS2, Line 40: // Each message is serialized into a protobuf-like JSON representation : // terminated with a newline character. : JSON, > Nevermind, I wrote this after not looking through all the files and realizi Ack http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc File src/kudu/subprocess/subprocess_protocol.cc: http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc@45 PS2, Line 45: const int SubprocessProtocol::kMaxMessageBytes = 1024 * 1024; > This cap will probably need to be revisited. I guess the main reason you think we need to revisit this is for different usage (e.g. using for Ranger integration or Atlas integration), the requirement for max bytes per message can be different? I add 'max_msg_bytes' member to allow different instances of SubprocessProtocol have different cap. http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc@164 PS2, Line 164: return Status::OK(); : } : : Status SubprocessProtocol::DoRead(faststring* buf) { : uint8_t* pos = buf->data(); : size_t rem = buf->length(); : while (rem > 0) { : ssize_t r; : RETRY_ON_EINTR(r, read(read_fd_, pos, rem)); : if (r == -1) { : return Status::IOError("Error reading from pipe", "", errno); : } : if (r == 0) { : return Status::EndOfFile("Other end of pipe was closed"); : } : DCHECK_GE(rem, r); : rem -= r; : pos += r; : } : return Status::OK(); : } : : Status SubprocessProtocol::DoWrite(const faststring& buf) { : const uint8_t* pos = buf.data(); : size_t rem = buf.length(); : while (rem > 0) { : ssize_t r; : RETRY_ON_EINTR(r, write(write_fd_, pos, rem)); : if (r == -1) { : if (errno == EPIPE) { : return Status::EndOfFile("Other end of pipe was closed"); : } : return Status::IOError("Error writing to pipe", "", errno); : } : DCHECK_GE(rem, r); : rem -= r; : pos += r; : } : > Actually, I think reusing all of this is fine; I just don't think we need t Ack http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc File src/kudu/subprocess/subprocess_protocol.cc: http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@45 PS3, Line 45: kMaxMessageBytes > shouldn't this be a flag instead? RPC max message size is 50 MB by default I add 'max_msg_bytes' member to allow different instances of SubprocessProtocol have different cap. And in each instance they can define their own flag to config it. http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@97 PS3, Line 97: case SerializationMode::PB: > isn't this little-endian? gutil/endian.h says " // Functions to do unalign Yeah, the comment is a bit confusing to me, but I think here is big-endian. https://github.com/apache/kudu/blob/master/src/kudu/gutil/endian.h#L370. http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@120 PS3, Line 120: break; > do we need to crash the server if an unknown serialization mode is used? al We should never expect the serialization mode to be unknown. So crash the server should be fine. -- To view, visit http://gerrit.cloudera.org:8080/14425 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: If73e27772e1897a04f04229c4906a24c61e361f2 Gerrit-Change-Number: 14425 Gerrit-PatchSet: 4 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Reviewer: Tidy Bot (241) Gerrit-Comment-Date: Mon, 18 Nov 2019 17:53:28 +0000 Gerrit-HasComments: Yes
