Adar Dembo has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/15185 )

Change subject: [cpp] KUDU-2971: protobuf-based wrapper for subprocesses
......................................................................


Patch Set 1:

(24 comments)

May need to modify the dist-test infra to get this working there.

http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/CMakeLists.txt
File src/kudu/subprocess/CMakeLists.txt:

http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/CMakeLists.txt@38
PS1, Line 38: execute_process(COMMAND ./gradlew :kudu-subprocess-echo:jar
I don't think you want to use execute_process() here, as it means this code 
gets run when cmake is run. What you actually want to do is to link this into 
the dependency graph so that if you build kudu_subprocess or 
subprocess_server-test, you also build the Java subprocess stuff. Look into 
add_custom_command() for that.

But, shouldn't we be using add_jar() out of FindJava.cmake? See 
src/kudu/hms/CMakeLists.txt (and 
https://cmake.org/cmake/help/latest/module/UseJava.html).


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h
File src/kudu/subprocess/call.h:

http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@34
PS1, Line 34: class SubprocessCall {
The thread safety semantics aren't super clear, nor is it obvious from usage 
just _how_ thread safe this needs to be. Instinctively I think it's more safe 
than it needs to be (i.e. maybe some of the "if !cb_ then return" checks can be 
converted into DCHECKs?) but I'm not quite sure.

Also, as a helper class for the subprocess server, why is this exposed outside 
of subprocess/server.cc at all? Seems like an implementation detail.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@47
PS1, Line 47: const MonoTime&
It's a simple int64_t so just return a copy.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@71
PS1, Line 71: SubprocessResponsePB&&
GSG forbids rvalue references outside of obvious usage (i.e. move constructors, 
move assignment operators, etc.) Just pass by value and ensure that callers use 
std::move(); clang-tidy will tell you if you did it wrong.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@97
PS1, Line 97: This is important because the callback may
            :   // destroy the request.
I don't see how that can be safe given L67, L79, and L90, all of which access 
internal state _after_ invoking the callback.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@99
PS1, Line 99:   mutable simple_spinlock lock_;
Given that this is held while invoking the callback and that the callback may 
do IO, seems better to use a sleeping mutex.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.h
File src/kudu/subprocess/server.h:

http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.h@42
PS1, Line 42: class SubprocessCall;
Shouldn't be necessary given you're including call.h. Then again, why is call.h 
being included at all?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc
File src/kudu/subprocess/server.cc:

http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@44
PS1, Line 44:
            : DEFINE_int32(subprocess_request_queue_size_bytes, 4 * 1024 * 1024,
            :              "Maximum size in bytes of the outbound request 
queue");
            : TAG_FLAG(subprocess_request_queue_size_bytes, advanced);
            :
            : DEFINE_int32(subprocess_response_queue_size_bytes, 4 * 1024 * 
1024,
            :              "Maximum size in bytes of the inbound response 
queue");
            : TAG_FLAG(subprocess_response_queue_size_bytes, advanced);
What happens if a single request or response exceeds this amount? Does the 
BlockingQueue allow the element to be added to the queue anyway? Or does it 
fail?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@64
PS1, Line 64: add enqueue
Nit: duplicative.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@112
PS1, Line 112:   RETURN_NOT_OK(Thread::Create("subprocess", "reader", 
&SubprocessServer::ReceiveMessagesTask,
             :                                this, &read_thread_));
             :   RETURN_NOT_OK(Thread::Create("subprocess", "writer", 
&SubprocessServer::SendMessagesTask,
             :                                this, &write_thread_));
             :   RETURN_NOT_OK(Thread::Create("subprocess", "deadline-checker",
             :                                
&SubprocessServer::CheckDeadlinesTask,
             :                                this, &deadline_checker_));
Nit: the "Task" suffix suggests a one-time operation rather than an active 
thread. Maybe use "Thread" suffixes?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@128
PS1, Line 128:   shared_ptr<SubprocessCall> call(new SubprocessCall(req, resp, 
&cb));
Could we use make_shared here? Could we move cb into the call instead of 
storing a pointer to it?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@134
PS1, Line 134:   if (closing_) {
             :     return;
             :   }
             :   // Stop further work from happening by killing the subprocess 
and shutting
             :   // down the queues.
             :   closing_.store(true);
Should use a CAS to avoid multiple threads racing on Shutdown and executing it 
more than once.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@140
PS1, Line 140:   WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop 
subprocess");
             :   response_queue_.Shutdown();
             :   outbound_call_queue_.Shutdown();
Curious as to whether there's a specific shutdown order we must follow, and why.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@165
PS1, Line 165:   DCHECK(message_protocol_) << "message protocol is not 
initialized";
Want to mirror this in SendMessagesTask?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@171
PS1, Line 171:       // The underlying pipe was closed. We're likely shutting 
down.
If we get an EOF, is closing_ going to be true? What's the relationship between 
EOF and closing_?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@174
PS1, Line 174:     WARN_NOT_OK(s, "failed to receive response from the 
subprocess");
Should this be a fatal error? Why or why not?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@176
PS1, Line 176:       // The queue has been shut down and we should shut down 
too.
As per EOF above, should we assert anything about closing_?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@186
PS1, Line 186:     if (!response_queue_.BlockingGet(&resp)) {
Consider BlockingDrainTo to reduce the number of wake ups under heavy load.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@188
PS1, Line 188:       return;
Assert on closing_?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@191
PS1, Line 191:       LOG(WARNING) << Substitute("Received invalid response: $0",
             :                                  
pb_util::SecureDebugString(resp));
             :       continue;
Hmm, shouldn't we CHECK/DCHECK on this? Seems like it'd indicate a programming 
error on our part.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@209
PS1, Line 209:   while (!closing_.load()) {
This is a tight loop, which probably isn't what you wanted.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@215
PS1, Line 215:         const auto& id_and_call = call_by_id_.begin();
             :         const auto& oldest_call = id_and_call->second;
             :         if (now > GetDeadline(oldest_call)) {
             :           timed_out_call = oldest_call;
             :           call_by_id_.erase(id_and_call);
             :         }
             :       }
This is an approximation, right? There's no strict enforcement that the first 
map entry is also the oldest call per its deadline; it's possible for scheduler 
vagaries across threads in Execute() to lead to slight out-of-order conditions. 
To be clear, that's totally OK because the deadline checking is best effort, 
but I think it's something we should document here.

One change we may want to make is for the deadline checker to cancel a group of 
calls per wake up. That is, start at the beginning of call_by_id_ and keep 
pulling off calls until you find one whose deadline is still valid. Then cancel 
them all en masse.


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@232
PS1, Line 232:     if (!outbound_call_queue_.BlockingGet(&call)) {
Maybe use DrainTo to reduce the cost of waking up when under heavy load?


http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@253
PS1, Line 253:     QueueStatus queue_status = outbound_call_queue_.Put(call);
Seems reasonable to add optional deadlines to BlockingPut and BlockingGet, a la 
BlockingDrainTo. That would make this more efficient by obviating the need for 
sleeping, and you wouldn't need dedicated code on L248-250 to check for a 
timeout before enqueuing either.



--
To view, visit http://gerrit.cloudera.org:8080/15185
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Id611e1c683df2721fd058f753b8686a688a5990d
Gerrit-Change-Number: 15185
Gerrit-PatchSet: 1
Gerrit-Owner: Andrew Wong <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Tidy Bot (241)
Gerrit-Comment-Date: Wed, 12 Feb 2020 20:20:48 +0000
Gerrit-HasComments: Yes

Reply via email to