Henry Robinson has posted comments on this change.
Change subject: IMPALA-{4670,4672,4784}: Add RpcMgr and port Statestore
services to KRPC
......................................................................
Patch Set 3:
(28 comments)
Although it might look like a lot has changed in this new patch, there are only
two relatively minor big changes:
1. Rewrite test_statestore.py in C++ (see statestore-test.cc) to make sure we
don't lose any of that coverage.
2. Remove InProcessStatestore which was unused and causing pain to keep
up-to-date.
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/common.proto
File be/src/rpc/common.proto:
Line 27: message ThriftWrapperPB { required bytes thrift_struct = 1; }
> a brief comment regarding the purpose would be good.
Done
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.cc
File be/src/rpc/rpc-mgr.cc:
Line 44: DEFINE_int32(num_acceptor_threads, 2, "Number of threads dedicated to
accepting "
> why so low?
Acceptor threads don't do much (just create a socket and pass it to the reactor
threads). This is unlike Thrift where the acceptor thread also did the SASL
negotiation, which was expensive and harmed throughput. So really this is
double the number of Thrift threads, and each will have less work to do per
cnxn.
We expect the number of connections to be much lower with KRPC as well.
PS3, Line 52: 4
> Yeah, this should be plumbed through, will do that.
Done
Line 59: DCHECK(is_inited()) << "Must call Init() before RegisterService()";
> dcheck that services haven't been started yet, or does that not matter?
Done
Line 62: new ServicePool(move(scoped_ptr), messenger_->metric_entity(),
service_queue_depth);
> why not just pass service_ptr.release()?
I can't quite do that because the gscoped_ptr<T>(T*) c'tor is explicit. I can
do:
new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release())
PS3, Line 64: service_pool->Init
> good point. if the init call fails, do we abort impalad startup? (i would a
yes indeed (and statestore startup).
PS3, Line 74: int32_t num_acceptor_threads
> so then move the flag to where it's used?
it's used in several places (anywhere that has an RpcMgr might want to use the
flag). So I think the DEFINE(..) here, DECLARE(...) in modules that want to use
it is the right pattern.
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.h
File be/src/rpc/rpc-mgr.h:
PS3, Line 41: 0
> what does it mean to manage 0 services?
It means that there are 0 services managed by the RpcMgr? Since the RpcMgr also
helps get proxies, you don't have to register a service for it to be useful.
PS3, Line 49: CLIENTS
> why define new terminology? isn't this just "proxy" in KuduRPC speak?
I've tried to standardize a bit better. 'clients' and 'servers' are pretty
established nouns in this space though.
PS3, Line 52: GetProxy
> so I guess RpcMgr deals with both the server and client side of things? As
See line 36: "Central manager for all RPC services and clients".
A service is a network-exported set of methods that can be called using the
KRPC protocol. Speaking precisely, a service 'definition' is the set of methods
(defined in a protobuf), but a service is the instantiation of that service
definition on a particular machine. It's a server-side only construct.
A proxy allows clients to easily call methods on one service, and so they share
the same set of method signatures (i.e. they refer to the same service
definition).
Typically a proxy and a service are used from different machines, and KRPC does
not check that the remote service has been started before creating a proxy
object. Connectivity / service availability checks are performed when the
actual RPC method is invoked.
Line 64: /// RpcMgr::RegisterService() before RpcMgr::StartServices() is called.
> does init() go before register()?
Done
Line 111: /// Creates a new proxy for a remote service of type P at location
'address', and places
> might want to point out here what P needs to be (auto-generated from a serv
Done
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc-mgr.inline.h
File be/src/rpc/rpc-mgr.inline.h:
Line 39: kudu::HostPort(address.hostname,
address.port).ResolveAddresses(&addresses),
> how expensive is this?
It's a DNS subsystem call.
Without the nscd (nameservice caching daemon) running, the mean response time
was ~5ms (although the 99.9th percentile was lower - there are larger timeout
events that are expensive).
However, nscd brings the mean response time down to about 10us per invocation.
We (Cloudera) recommend that users have nscd running for CDH deployments; we
should make sure that that is emphasised for Impala as well (it would make a
difference no matter if KRPC was used or not).
nscd also seems to reduce the frequency of outliers as well.
Line 41: proxy->reset(new P(messenger_, addresses[0]));
> dcheck that addresses isn't empty?
Done
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc.h
File be/src/rpc/rpc.h:
PS3, Line 36: Clients
> over in rpc-mgr.h, you seemed to use "client" as a synonym for proxy, but w
I've reworded rpc-mgr.h to standardize on Proxy. I'll write 'callers' here to
avoid ambiguity.
'A single RPC instance' is a bit confusing, I've reworded. But it's intended to
mean a single invocation of a remote method. The point at which you consider a
method 'invoked' seems to be causing some confusion: here I mean to be the
point at which application logic actually gets called on the server side.
PS3, Line 48: Rpc
> I was confused about this class at first since the name sounds like it's th
Retry semantics are often application-specific. For example, Kudu have
different retry requirements than us - for example, they want to fail over to
different replicas. (They do have a retry object, but it's very Kudu-specific).
I don't think it would help to call this a Wrapper class. From the perspective
of the caller, this class presents the abstraction of an RPC: the caller
invokes a method (via Execute()), and gets a response.
Line 51: /// 'RESP' (must both be protobuf).
> unclear what req and resp refer to
Outdated comment, done.
Line 58: Rpc(const TNetworkAddress& remote, RpcMgr* mgr) : remote_(remote),
mgr_(mgr) {}
> does this need to be public?
Done
Line 68: Rpc& SetMaxAttempts(int32_t max_attempts) {
> does this need to be specific (int32_t as opposed to int)?
It doesn't have to be, but I thought in general we tried to be explicit. Can
revert it to an int if preferred.
PS3, Line 90: func
> point out in what way F is constrained
Done
Line 91: if (max_rpc_attempts_ <= 1) {
> why not dcheck in set..()? there's no reason this should be a runtime error
Typo - should be <, not <=.
Re: DCHECK vs runtime error - I can see these parameters being controlled by
flags (e.g the timeout for the data stream sender), so they will need to have
runtime checks.
Line 112: if (!controller.status().IsRemoteError()) break;
> comment on significance of remote error. looks like you're getting a non-ok
Rewritten this to make it a bit clearer. I've introduced IsRetryableError()
(which is also needed in a subsequent patch) to abstract the logic to decide if
a retry is needed.
Line 143: /// TODO(KRPC): Warn on uninitialized timeout, or set meaningful
default.
> why warn? if you don't set a timeout, it shouldn't time out.
I think we should just set a default. It's now a *really* bad idea to have RPCs
without timeouts, because of the limited set of service threads.
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/rpc/rpc_test.proto
File be/src/rpc/rpc_test.proto:
Line 18: package impala;
> add comment describing what this is used for
Done
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/runtime/exec-env.h
File be/src/runtime/exec-env.h:
Line 159: TNetworkAddress subscriber_address_;
> same address as the generic backend service, meaning it'll be removed once
Yes - this will go away.
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/statestore/statestore-subscriber.cc
File be/src/statestore/statestore-subscriber.cc:
Line 77: class StatestoreSubscriberImpl : public StatestoreSubscriberIf {
> calling it 'impl' isn't super-meaningful (impl of what?). would be nice to
How about just 'service'? ServiceImpl is a bit clunky, and there aren't going
to be multiple implementations of the *If class.
Line 207: return Status("--statestore_subscriber_num_svc_threads must be
at least 2");
> why not just bump it up to 2?
I prefer to avoid surprise by changing flags if they're set wrong. I suspect
no-one's going to set this lower.
http://gerrit.cloudera.org:8080/#/c/5720/3/be/src/statestore/statestore-subscriber.h
File be/src/statestore/statestore-subscriber.h:
Line 128: TNetworkAddress subscriber_address_;
> wouldn't that be the same as the generic backend address?
Yes - when the ImpalaInternalService is ported to KRPC, this should go away.
--
To view, visit http://gerrit.cloudera.org:8080/5720
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I8dbf12b9ecd71d26d239d31c19b487175194c766
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <[email protected]>
Gerrit-Reviewer: Dan Hecht <[email protected]>
Gerrit-Reviewer: David Knupp <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Juan Yu <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-Reviewer: Taras Bobrovytsky <[email protected]>
Gerrit-HasComments: Yes