Adar Dembo has posted comments on this change. ( http://gerrit.cloudera.org:8080/15185 )
Change subject: [cpp] KUDU-2971: protobuf-based wrapper for subprocesses ...................................................................... Patch Set 1: (24 comments) May need to modify the dist-test infra to get this working there. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/CMakeLists.txt File src/kudu/subprocess/CMakeLists.txt: http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/CMakeLists.txt@38 PS1, Line 38: execute_process(COMMAND ./gradlew :kudu-subprocess-echo:jar I don't think you want to use execute_process() here, as it means this code gets run when cmake is run. What you actually want to do is to link this into the dependency graph so that if you build kudu_subprocess or subprocess_server-test, you also build the Java subprocess stuff. Look into add_custom_command() for that. But, shouldn't we be using add_jar() out of FindJava.cmake? See src/kudu/hms/CMakeLists.txt (and https://cmake.org/cmake/help/latest/module/UseJava.html). http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h File src/kudu/subprocess/call.h: http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@34 PS1, Line 34: class SubprocessCall { The thread safety semantics aren't super clear, nor is it obvious from usage just _how_ thread safe this needs to be. Instinctively I think it's more safe than it needs to be (i.e. maybe some of the "if !cb_ then return" checks can be converted into DCHECKs?) but I'm not quite sure. Also, as a helper class for the subprocess server, why is this exposed outside of subprocess/server.cc at all? Seems like an implementation detail. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@47 PS1, Line 47: const MonoTime& It's a simple int64_t so just return a copy. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@71 PS1, Line 71: SubprocessResponsePB&& GSG forbids rvalue references outside of obvious usage (i.e. move constructors, move assignment operators, etc.) Just pass by value and ensure that callers use std::move(); clang-tidy will tell you if you did it wrong. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@97 PS1, Line 97: This is important because the callback may : // destroy the request. I don't see how that can be safe given L67, L79, and L90, all of which access internal state _after_ invoking the callback. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/call.h@99 PS1, Line 99: mutable simple_spinlock lock_; Given that this is held while invoking the callback and that the callback may do IO, seems better to use a sleeping mutex. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.h File src/kudu/subprocess/server.h: http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.h@42 PS1, Line 42: class SubprocessCall; Shouldn't be necessary given you're including call.h. Then again, why is call.h being included at all? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc File src/kudu/subprocess/server.cc: http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@44 PS1, Line 44: : DEFINE_int32(subprocess_request_queue_size_bytes, 4 * 1024 * 1024, : "Maximum size in bytes of the outbound request queue"); : TAG_FLAG(subprocess_request_queue_size_bytes, advanced); : : DEFINE_int32(subprocess_response_queue_size_bytes, 4 * 1024 * 1024, : "Maximum size in bytes of the inbound response queue"); : TAG_FLAG(subprocess_response_queue_size_bytes, advanced); What happens if a single request or response exceeds this amount? Does the BlockingQueue allow the element to be added to the queue anyway? Or does it fail? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@64 PS1, Line 64: add enqueue Nit: duplicative. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@112 PS1, Line 112: RETURN_NOT_OK(Thread::Create("subprocess", "reader", &SubprocessServer::ReceiveMessagesTask, : this, &read_thread_)); : RETURN_NOT_OK(Thread::Create("subprocess", "writer", &SubprocessServer::SendMessagesTask, : this, &write_thread_)); : RETURN_NOT_OK(Thread::Create("subprocess", "deadline-checker", : &SubprocessServer::CheckDeadlinesTask, : this, &deadline_checker_)); Nit: the "Task" suffix suggests a one-time operation rather than an active thread. Maybe use "Thread" suffixes? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@128 PS1, Line 128: shared_ptr<SubprocessCall> call(new SubprocessCall(req, resp, &cb)); Could we use make_shared here? Could we move cb into the call instead of storing a pointer to it? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@134 PS1, Line 134: if (closing_) { : return; : } : // Stop further work from happening by killing the subprocess and shutting : // down the queues. : closing_.store(true); Should use a CAS to avoid multiple threads racing on Shutdown and executing it more than once. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@140 PS1, Line 140: WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop subprocess"); : response_queue_.Shutdown(); : outbound_call_queue_.Shutdown(); Curious as to whether there's a specific shutdown order we must follow, and why. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@165 PS1, Line 165: DCHECK(message_protocol_) << "message protocol is not initialized"; Want to mirror this in SendMessagesTask? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@171 PS1, Line 171: // The underlying pipe was closed. We're likely shutting down. If we get an EOF, is closing_ going to be true? What's the relationship between EOF and closing_? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@174 PS1, Line 174: WARN_NOT_OK(s, "failed to receive response from the subprocess"); Should this be a fatal error? Why or why not? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@176 PS1, Line 176: // The queue has been shut down and we should shut down too. As per EOF above, should we assert anything about closing_? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@186 PS1, Line 186: if (!response_queue_.BlockingGet(&resp)) { Consider BlockingDrainTo to reduce the number of wake ups under heavy load. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@188 PS1, Line 188: return; Assert on closing_? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@191 PS1, Line 191: LOG(WARNING) << Substitute("Received invalid response: $0", : pb_util::SecureDebugString(resp)); : continue; Hmm, shouldn't we CHECK/DCHECK on this? Seems like it'd indicate a programming error on our part. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@209 PS1, Line 209: while (!closing_.load()) { This is a tight loop, which probably isn't what you wanted. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@215 PS1, Line 215: const auto& id_and_call = call_by_id_.begin(); : const auto& oldest_call = id_and_call->second; : if (now > GetDeadline(oldest_call)) { : timed_out_call = oldest_call; : call_by_id_.erase(id_and_call); : } : } This is an approximation, right? There's no strict enforcement that the first map entry is also the oldest call per its deadline; it's possible for scheduler vagaries across threads in Execute() to lead to slight out-of-order conditions. To be clear, that's totally OK because the deadline checking is best effort, but I think it's something we should document here. One change we may want to make is for the deadline checker to cancel a group of calls per wake up. That is, start at the beginning of call_by_id_ and keep pulling off calls until you find one whose deadline is still valid. Then cancel them all en masse. http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@232 PS1, Line 232: if (!outbound_call_queue_.BlockingGet(&call)) { Maybe use DrainTo to reduce the cost of waking up when under heavy load? http://gerrit.cloudera.org:8080/#/c/15185/1/src/kudu/subprocess/server.cc@253 PS1, Line 253: QueueStatus queue_status = outbound_call_queue_.Put(call); Seems reasonable to add optional deadlines to BlockingPut and BlockingGet, a la BlockingDrainTo. That would make this more efficient by obviating the need for sleeping, and you wouldn't need dedicated code on L248-250 to check for a timeout before enqueuing either. -- To view, visit http://gerrit.cloudera.org:8080/15185 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Id611e1c683df2721fd058f753b8686a688a5990d Gerrit-Change-Number: 15185 Gerrit-PatchSet: 1 Gerrit-Owner: Andrew Wong <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Reviewer: Tidy Bot (241) Gerrit-Comment-Date: Wed, 12 Feb 2020 20:20:48 +0000 Gerrit-HasComments: Yes
