Hao Hao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14425 )

Change subject: KUDU-2971 p1: add subprocess module
......................................................................


Patch Set 4:

(8 comments)

> Patch Set 2:
>
> > Patch Set 2: Code-Review+1
> >
> > (1 comment)
> >
> > I think this refactoring makes sense. I'm also not opposed to retaining the 
> > JSON support, since it's still being used by "kudu test mini_cluster".
> >
> > I haven't looked at the other patches yet, but I do wonder how we're going 
> > to multiplex on top of the subprocess protocol. Thinking out loud:
> >
> > Naively, we could put a lock around the entire subprocess protocol and 
> > force users to effectively do:
> > 1. Lock
> > 2. Send my message
> > 3. Wait for and receive response.
> > 4. Unlock
> >
> > But that'll perform poorly as there could only be one outstanding request 
> > this way.
> >
> > A better approach would be to allow multiple producers. On the send side, 
> > maybe that's done by locking SendMessage and forcing threads to call 
> > SendMessage directly. Or maybe it's done via threads posting messages to a 
> > blocking queue and a dedicated thread waiting on that queue and calling 
> > SendMessage. Would be interesting to compare the two.
> >
> > With multiple producers, the receive side will probably need a dedicated 
> > thread blocked on ReceiveMessage. Whenever it gets a new message, it has to 
> > somehow match it up with a sender. Using a sequence ID field in the message 
> > makes sense, but that muddies the abstraction in that now some aspect of 
> > the protobuf definition includes information that's necessary for the 
> > transport protocol (rather than the "application").
> >
> > Anyway, the refactor looks good; the rest of these issues will shake out 
> > separately.
>
> Yeah, I chatted briefly about this with Hao on Slack and alluded to something 
> similar in my comments on the Java patch. I think for concurrent calls, we'd 
> want an ID field like what we have in the RPC headers.

In https://gerrit.cloudera.org/c/14625/, I implemented something similar as 
Adar commented. ‘The sender can post request messages to a blocking
queue and a dedicated thread waiting on the queue and
send the message to the underlying subprocess once getting one. The
receiver has another dedicated thread blocks on receiving messages from
the subprocess, and notifies the sender the response with the matching
message ID.’.  I chose to use blocking queue as the sender doesn’t need to 
block on the lock of SendMessage.

http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG@8
PS2, Line 8:
           : Utility classes exist that allow for IPC over stdin/stdout via 
protobuf
           : and JSON-encoded protobuf. This commit moves those classes into 
their
           : own directory so it can be reused by other subprocesses.
           :
> nit: this sentence is a bit long to follow. Maybe:
Done


http://gerrit.cloudera.org:8080/#/c/14425/2//COMMIT_MSG@13
PS2, Line 13: Following commits can then extend it to support concurrent 
communications
            : with subprocess. There are no functional changes in this patch.
            :
> nit: maybe mention that this patch has no functional changes.
Done


http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.h
File src/kudu/subprocess/subprocess_protocol.h:

http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.h@40
PS2, Line 40:     // Each message is serialized into a protobuf-like JSON 
representation
            :     // terminated with a newline character.
            :     JSON,
> Nevermind, I wrote this after not looking through all the files and realizi
Ack


http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc
File src/kudu/subprocess/subprocess_protocol.cc:

http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc@45
PS2, Line 45: const int SubprocessProtocol::kMaxMessageBytes = 1024 * 1024;
> This cap will probably need to be revisited.
I guess the main reason you think we need to revisit this is for different 
usage (e.g. using for Ranger integration or Atlas integration), the requirement 
for max bytes per message can be different? I add 'max_msg_bytes' member to 
allow different instances of SubprocessProtocol have different cap.


http://gerrit.cloudera.org:8080/#/c/14425/2/src/kudu/subprocess/subprocess_protocol.cc@164
PS2, Line 164:   return Status::OK();
             : }
             :
             : Status SubprocessProtocol::DoRead(faststring* buf) {
             :   uint8_t* pos = buf->data();
             :   size_t rem = buf->length();
             :   while (rem > 0) {
             :     ssize_t r;
             :     RETRY_ON_EINTR(r, read(read_fd_, pos, rem));
             :     if (r == -1) {
             :       return Status::IOError("Error reading from pipe", "", 
errno);
             :     }
             :     if (r == 0) {
             :       return Status::EndOfFile("Other end of pipe was closed");
             :     }
             :     DCHECK_GE(rem, r);
             :     rem -= r;
             :     pos += r;
             :   }
             :   return Status::OK();
             : }
             :
             : Status SubprocessProtocol::DoWrite(const faststring& buf) {
             :   const uint8_t* pos = buf.data();
             :   size_t rem = buf.length();
             :   while (rem > 0) {
             :     ssize_t r;
             :     RETRY_ON_EINTR(r, write(write_fd_, pos, rem));
             :     if (r == -1) {
             :       if (errno == EPIPE) {
             :         return Status::EndOfFile("Other end of pipe was closed");
             :       }
             :       return Status::IOError("Error writing to pipe", "", errno);
             :     }
             :     DCHECK_GE(rem, r);
             :     rem -= r;
             :     pos += r;
             :   }
             : 
> Actually, I think reusing all of this is fine; I just don't think we need t
Ack


http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc
File src/kudu/subprocess/subprocess_protocol.cc:

http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@45
PS3, Line 45: kMaxMessageBytes
> shouldn't this be a flag instead? RPC max message size is 50 MB by default
I add 'max_msg_bytes' member to allow different instances of SubprocessProtocol 
have different cap. And in each instance they can define their own flag to 
config it.


http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@97
PS3, Line 97:     case SerializationMode::PB:
> isn't this little-endian? gutil/endian.h says "  // Functions to do unalign
Yeah, the comment is a bit confusing to me, but I think here is big-endian. 
https://github.com/apache/kudu/blob/master/src/kudu/gutil/endian.h#L370.


http://gerrit.cloudera.org:8080/#/c/14425/3/src/kudu/subprocess/subprocess_protocol.cc@120
PS3, Line 120:       break;
> do we need to crash the server if an unknown serialization mode is used? al
We should never expect the serialization mode to be unknown. So crash the 
server should be fine.



--
To view, visit http://gerrit.cloudera.org:8080/14425
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If73e27772e1897a04f04229c4906a24c61e361f2
Gerrit-Change-Number: 14425
Gerrit-PatchSet: 4
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Tidy Bot (241)
Gerrit-Comment-Date: Mon, 18 Nov 2019 17:53:28 +0000
Gerrit-HasComments: Yes

Reply via email to