Henry Robinson has posted comments on this change. Change subject: IMPALA-{4670,4672,4784}: Add RpcMgr and port Statestore services to KRPC ......................................................................
Patch Set 3: (28 comments) Although it might look like a lot has changed in this new patch, there are only two relatively minor big changes: 1. Rewrite test_statestore.py in C++ (see statestore-test.cc) to make sure we don't lose any of that coverage. 2. Remove InProcessStatestore which was unused and causing pain to keep up-to-date. http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/common.proto File be/src/rpc/common.proto: Line 27: message ThriftWrapperPB { required bytes thrift_struct = 1; } > a brief comment regarding the purpose would be good. Done http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.cc File be/src/rpc/rpc-mgr.cc: Line 44: DEFINE_int32(num_acceptor_threads, 2, "Number of threads dedicated to accepting " > why so low? Acceptor threads don't do much (just create a socket and pass it to the reactor threads). This is unlike Thrift where the acceptor thread also did the SASL negotiation, which was expensive and harmed throughput. So really this is double the number of Thrift threads, and each will have less work to do per cnxn. We expect the number of connections to be much lower with KRPC as well. PS3, Line 52: 4 > Yeah, this should be plumbed through, will do that. Done Line 59: DCHECK(is_inited()) << "Must call Init() before RegisterService()"; > dcheck that services haven't been started yet, or does that not matter? Done Line 62: new ServicePool(move(scoped_ptr), messenger_->metric_entity(), service_queue_depth); > why not just pass service_ptr.release()? I can't quite do that because the gscoped_ptr<T>(T*) c'tor is explicit. I can do: new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release()) PS3, Line 64: service_pool->Init > good point. if the init call fails, do we abort impalad startup? (i would a yes indeed (and statestore startup). PS3, Line 74: int32_t num_acceptor_threads > so then move the flag to where it's used? it's used in several places (anywhere that has an RpcMgr might want to use the flag). So I think the DEFINE(..) here, DECLARE(...) in modules that want to use it is the right pattern. http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.h File be/src/rpc/rpc-mgr.h: PS3, Line 41: 0 > what does it mean to manage 0 services? It means that there are 0 services managed by the RpcMgr? Since the RpcMgr also helps get proxies, you don't have to register a service for it to be useful. PS3, Line 49: CLIENTS > why define new terminology? isn't this just "proxy" in KuduRPC speak? I've tried to standardize a bit better. 'clients' and 'servers' are pretty established nouns in this space though. PS3, Line 52: GetProxy > so I guess RpcMgr deals with both the server and client side of things? As See line 36: "Central manager for all RPC services and clients". A service is a network-exported set of methods that can be called using the KRPC protocol. Speaking precisely, a service 'definition' is the set of methods (defined in a protobuf), but a service is the instantiation of that service definition on a particular machine. It's a server-side only construct. A proxy allows clients to easily call methods on one service, and so they share the same set of method signatures (i.e. they refer to the same service definition). Typically a proxy and a service are used from different machines, and KRPC does not check that the remote service has been started before creating a proxy object. Connectivity / service availability checks are performed when the actual RPC method is invoked. Line 64: /// RpcMgr::RegisterService() before RpcMgr::StartServices() is called. > does init() go before register()? Done Line 111: /// Creates a new proxy for a remote service of type P at location 'address', and places > might want to point out here what P needs to be (auto-generated from a serv Done http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.inline.h File be/src/rpc/rpc-mgr.inline.h: Line 39: kudu::HostPort(address.hostname, address.port).ResolveAddresses(&addresses), > how expensive is this? It's a DNS subsystem call. Without the nscd (nameservice caching daemon) running, the mean response time was ~5ms (although the 99.9th percentile was lower - there are larger timeout events that are expensive). However, nscd brings the mean response time down to about 10us per invocation. We (Cloudera) recommend that users have nscd running for CDH deployments; we should make sure that that is emphasised for Impala as well (it would make a difference no matter if KRPC was used or not). nscd also seems to reduce the frequency of outliers as well. Line 41: proxy->reset(new P(messenger_, addresses[0])); > dcheck that addresses isn't empty? Done http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS3, Line 36: Clients > over in rpc-mgr.h, you seemed to use "client" as a synonym for proxy, but w I've reworded rpc-mgr.h to standardize on Proxy. I'll write 'callers' here to avoid ambiguity. 'A single RPC instance' is a bit confusing, I've reworded. But it's intended to mean a single invocation of a remote method. The point at which you consider a method 'invoked' seems to be causing some confusion: here I mean to be the point at which application logic actually gets called on the server side. PS3, Line 48: Rpc > I was confused about this class at first since the name sounds like it's th Retry semantics are often application-specific. For example, Kudu have different retry requirements than us - for example, they want to fail over to different replicas. (They do have a retry object, but it's very Kudu-specific). I don't think it would help to call this a Wrapper class. From the perspective of the caller, this class presents the abstraction of an RPC: the caller invokes a method (via Execute()), and gets a response. Line 51: /// 'RESP' (must both be protobuf). > unclear what req and resp refer to Outdated comment, done. Line 58: Rpc(const TNetworkAddress& remote, RpcMgr* mgr) : remote_(remote), mgr_(mgr) {} > does this need to be public? Done Line 68: Rpc& SetMaxAttempts(int32_t max_attempts) { > does this need to be specific (int32_t as opposed to int)? It doesn't have to be, but I thought in general we tried to be explicit. Can revert it to an int if preferred. PS3, Line 90: func > point out in what way F is constrained Done Line 91: if (max_rpc_attempts_ <= 1) { > why not dcheck in set..()? there's no reason this should be a runtime error Typo - should be <, not <=. Re: DCHECK vs runtime error - I can see these parameters being controlled by flags (e.g the timeout for the data stream sender), so they will need to have runtime checks. Line 112: if (!controller.status().IsRemoteError()) break; > comment on significance of remote error. looks like you're getting a non-ok Rewritten this to make it a bit clearer. I've introduced IsRetryableError() (which is also needed in a subsequent patch) to abstract the logic to decide if a retry is needed. Line 143: /// TODO(KRPC): Warn on uninitialized timeout, or set meaningful default. > why warn? if you don't set a timeout, it shouldn't time out. I think we should just set a default. It's now a *really* bad idea to have RPCs without timeouts, because of the limited set of service threads. http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc_test.proto File be/src/rpc/rpc_test.proto: Line 18: package impala; > add comment describing what this is used for Done http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/runtime/exec-env.h File be/src/runtime/exec-env.h: Line 159: TNetworkAddress subscriber_address_; > same address as the generic backend service, meaning it'll be removed once Yes - this will go away. http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/statestore/statestore-subscriber.cc File be/src/statestore/statestore-subscriber.cc: Line 77: class StatestoreSubscriberImpl : public StatestoreSubscriberIf { > calling it 'impl' isn't super-meaningful (impl of what?). would be nice to How about just 'service'? ServiceImpl is a bit clunky, and there aren't going to be multiple implementations of the *If class. Line 207: return Status("--statestore_subscriber_num_svc_threads must be at least 2"); > why not just bump it up to 2? I prefer to avoid surprise by changing flags if they're set wrong. I suspect no-one's going to set this lower. http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/statestore/statestore-subscriber.h File be/src/statestore/statestore-subscriber.h: Line 128: TNetworkAddress subscriber_address_; > wouldn't that be the same as the generic backend address? Yes - when the ImpalaInternalService is ported to KRPC, this should go away. -- To view, visit http://gerrit.cloudera.org:8080/5720 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I8dbf12b9ecd71d26d239d31c19b487175194c766 Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <he...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: David Knupp <dkn...@cloudera.com> Gerrit-Reviewer: Henry Robinson <he...@cloudera.com> Gerrit-Reviewer: Juan Yu <j...@cloudera.com> Gerrit-Reviewer: Marcel Kornacker <mar...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Reviewer: Taras Bobrovytsky <tbobrovyt...@cloudera.com> Gerrit-HasComments: Yes