Henry Robinson has posted comments on this change.

Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC
......................................................................


Patch Set 3:

(30 comments)

http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.cc
File be/src/runtime/data-stream-mgr.cc:

PS3, Line 327: VLOG_QUERY << "DataStreamMgr maintenance tasks complete. Took: "
             :                << PrettyPrinter::Print(MonotonicMillis() - 
start, TUnit::TIME_MS);
> Do you think it's worth printing what maintenance tasks were done in this i
Removed for now.


http://gerrit.cloudera.org:8080/#/c/5888/2/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

PS2, Line 59: In the first phase the sender initiates 
removed


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h
File be/src/runtime/data-stream-mgr.h:

Line 60: /// first batch. Since the sender may start sending before the 
receiver is ready, the data
> we could relatively easily remove that restriction, the execrpc changes are
Any change I can think of would be substantial (two-phase start-up protocol) or 
bug-prone (waiting).


Line 77: /// fixed-size buffer for later processing, or discard the batch if 
the buffer is full. In
> how can the is-full case come up? shouldn't flow control keep the sender fr
No, for a couple of reasons:

1. Flow control doesn't kick in until the first round of batches have been 
sent, so all senders try to send batches at once. 

2. In the steady-state case with a receiver that's faster than the senders, 
there'll be no queuing. But if the receiver suddenly stalls there can be many 
senders still preparing a new batch, and together they can fill up the queue. 

2. The queue is limited by the byte size of the batches. The next batch might 
be really large and overflow the queue, and we can't predict that in general.


Line 84: /// queue. An error code is returned which causes the sender to retry 
the previous batch.
> sounds complicated. is that really necessary?
I'm not sure which part you're referring to - but yes, in general this is how 
the flow control is implemented. The pending sender list replaces the set of 
blocked threads from Thrift, and is lighter weight because it gives us the 
ability to discard the row batch payload.

The error code is used to signal to the RPC layer on the sender side that the 
RPC should be retried. This has the benefit of making the retry transparent to 
the caller code.


Line 113: /// time-out and cancel itself. However, it is usual that the 
coordinator will initiate
> i guess that extra complication is necessary because the non-existence of a
Exactly.


Line 124: /// The sender has a default timeout of 2 minutes for TransmitData() 
calls. If the timeout
> why so long?
The former case is detected in almost all cases. If the receiver has shown up, 
and been closed, the sender will find that in the closed stream cache.


Line 131: /// immediately. Both of these cases are designed so that the sender 
can prepare a new
> in the queued case, it seems like the response should go out when the queue
In the queued batch case, we could look to see if the queue had any more 
capacity before responding to the sender. However, it's hard to know how much 
capacity a sender needs for its next batch (as batches with strings can vary in 
size a lot). This strategy is optimistic - if the receiver is able to accept a 
batch, we presume that it can accept the next one as well. In my tests, this 
worked better than pessimistically pausing senders until we knew for sure there 
was room for their particular batch.


PS3, Line 135: idea
> I would rather use the word 'assumption' here.
Done


Line 137: /// notification should not exceed 60s and so the sender will not 
time out.
> that also sounds complicated.
The sender needs a timeout, after which it will fail, and so the receiver has 
to try to respond within that timeout. That's all that's going on here - 
picking a timeout and then picking a response time, modulo standard distributed 
systems issues, that is very likely to be less than that timeout. This avoids 
false negatives.


PS3, Line 212: TRANSMIT_DATA_TIMEOUT_SECONDS
> I think there needs to be some comment clearly distinguishing between this 
I removed this because it was so similar to datastream_sender_timeout_ms that 
it made sense to just use the flag for both the initial timeout, and the 
row-batch processing timeout.


Line 245:   void AddData(const TUniqueId& fragment_instance_id, 
TransmitDataCtx&& payload);
> in other words, pass in a pointer and stipulate that AddData owns it.
I feel that rvalues do make ownership explicit. You can't pass a non-temporary 
rvalue without move(X), which makes it really clear.

I think it's better to make it clear from the code (you cannot pass a 
TransmitDataCtx without relinquishing ownership) than it is to use the comments 
to convey a convention.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.cc
File be/src/runtime/data-stream-recvr.cc:

PS3, Line 129: pending_senders_
> Maybe this could be future work, but I foresee a need to cap this at some n
Hm - why do you think failing the query's a good idea? If the receiver is slow 
relative to the sender, this queue will grow, but that's not an error.


PS3, Line 137: SpinLock
> Isn't this a rather large amount of work to have under a spinlock?
Although it's quite a few lines, I don't think it's that much work. What 
operation looks expensive to you?


PS3, Line 191: // num_remaining_senders_ could be 0 because an AddBatch() can 
arrive *after* a
             :     // EndDataStream() RPC for the same sender, due to 
asynchrony on the sender side (the
             :     // sender gets closed or cancelled, but doesn't wait for the 
oustanding TransmitData()
             :     // to complete before trying to close the channel).
> If this is the only case where num_remaining_senders_ can be 0, then is the
We have to clean up the RPC state and responding seems a good way to do that.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.h
File be/src/runtime/data-stream-recvr.h:

PS3, Line 174: good_bytes_received_counter_
> bytes_accepted_counter_ ?
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/descriptors.h
File be/src/runtime/descriptors.h:

Line 546:   /// Serialize to the row_tuples field of a RowBatchPb.
> something called ToProto should materialize a message that corresponds to t
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-internal-service.cc
File be/src/service/impala-internal-service.cc:

Line 80:     ExecEnv::GetInstance()->stream_mgr()->AddData(finst_id, 
move(payload));
> This is a tad bit confusing. Why is there not a necessity to have a Respond
AddData() will always respond to the RPC, but may do so asynchronously, so we 
can't use a return value here. 

If AddData() is not called, we have to respond to the RPC right here to make 
sure it doesn't hang on the client. I added some comments.


PS3, Line 99: // TODO: Check return
> What he said.
Done


Line 118:     context->GetInboundSidecar(filter.header.directory_sidecar_idx(), 
&filter.directory);
> Same here, check return status.
Done


Line 155:   context->RespondSuccess();
> No need to return status.SetTStatus(&return_val) ?
Good catch, done.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-server.cc
File be/src/service/impala-server.cc:

PS3, Line 1925: move
> Include what you use? <utility>
Done


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala_internal_service.proto
File be/src/service/impala_internal_service.proto:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> let's move all .proto files into common/proto, that'll make it a lot easier
The tooling is set up so that proto files fit more neatly in the existing 
source directories, and during development I've found it convenient to have the 
generated files in the same directory as other source files. I'd rather keep 
them in the source dirs - finding them is not hard.


Line 32:   // Tuples for all rows
> incomprehensible comment.
Do you mean like this:

  message TupleId {
    int32 id = 1;
  }
?

If so I don't quite see the benefit vs the extra verbosity.


Line 46:   // Of type CatalogObjects.THdfsCompression (TODO(KRPC): native enum)
> why not use the enum?
We use THdfsCompression here because that's used elsewhere in the code where 
it's harder to change the serialization format (i.e. in Java). I could set up a 
mirror compression type in protobufs that's used for this field specifically, 
but that would be brittle (got to make sure the translation is done accurately 
from thrift<->proto and that no cases are missed). I think it's better to wait 
until we can use a proto enum everywhere.


Line 50: message TransmitDataRequestPb {
> you left out the service version (example: ImpalaInternalServiceVersion in 
I think there are some details that are underspecified in our current 
implementation that need some discussion - for example, how should version 
mismatches be indicated to the client? KRPC has an out-of-band error mechanism 
that we can use. 

Alternatively, we can add versions to response objects, with the convention 
that if the replied version != the requested version, no fields will be set and 
the RPC should fail. 

In Thrift only the request objects have a version field.

What should we do with messages that aren't parameters? Do they have version 
fields? Otherwise, how should they be versioned? (Say we want to add a field to 
StatusPb). Do we have StatusPbV1 etc?

I think this warrants a bit more discussion. I've changed all the fields to 
'optional' (that's the only possibility in protobuf v3, out of interest). I 
think that gives us some future-proofing while we get the details nailed down, 
since the absence of a protocol marker can be interpreted as V0.


Line 115: service ExecControlService {
> why not put the services in separate .proto files
Done


Line 131: service DataStreamService {
> would it make sense to have separate patches for the two services? it feels
I've started this, but it's a bit time-consuming moving thousands of lines of 
changes between commits. For now, please review the data stream changes, and 
eventually the control svc changes will move into a different patch.


Line 140:   // Called by the coordinator to deliver global runtime filters to 
fragment
> i consider the filters to be control structures. why wouldn't they be in ex
I don't think they are control structures. They have large payloads and contain 
tuple data.


http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impalad-main.cc
File be/src/service/impalad-main.cc:

PS3, Line 79: exec_env->Init();
> Need to check returned status and fail if necessary.
Done


-- 
To view, visit http://gerrit.cloudera.org:8080/5888
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238
Gerrit-PatchSet: 3
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Henry Robinson <[email protected]>
Gerrit-Reviewer: Anonymous Coward #168
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-HasComments: Yes

Reply via email to