Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port ImpalaInternalService to KRPC ......................................................................
Patch Set 3: (30 comments) http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.cc File be/src/runtime/data-stream-mgr.cc: PS3, Line 327: VLOG_QUERY << "DataStreamMgr maintenance tasks complete. Took: " : << PrettyPrinter::Print(MonotonicMillis() - start, TUnit::TIME_MS); > Do you think it's worth printing what maintenance tasks were done in this i Removed for now. http://gerrit.cloudera.org:8080/#/c/5888/2/be/src/runtime/data-stream-mgr.h File be/src/runtime/data-stream-mgr.h: PS2, Line 59: In the first phase the sender initiates removed http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-mgr.h File be/src/runtime/data-stream-mgr.h: Line 60: /// first batch. Since the sender may start sending before the receiver is ready, the data > we could relatively easily remove that restriction, the execrpc changes are Any change I can think of would be substantial (two-phase start-up protocol) or bug-prone (waiting). Line 77: /// fixed-size buffer for later processing, or discard the batch if the buffer is full. In > how can the is-full case come up? shouldn't flow control keep the sender fr No, for a couple of reasons: 1. Flow control doesn't kick in until the first round of batches have been sent, so all senders try to send batches at once. 2. In the steady-state case with a receiver that's faster than the senders, there'll be no queuing. But if the receiver suddenly stalls there can be many senders still preparing a new batch, and together they can fill up the queue. 2. The queue is limited by the byte size of the batches. The next batch might be really large and overflow the queue, and we can't predict that in general. Line 84: /// queue. An error code is returned which causes the sender to retry the previous batch. > sounds complicated. is that really necessary? I'm not sure which part you're referring to - but yes, in general this is how the flow control is implemented. The pending sender list replaces the set of blocked threads from Thrift, and is lighter weight because it gives us the ability to discard the row batch payload. The error code is used to signal to the RPC layer on the sender side that the RPC should be retried. This has the benefit of making the retry transparent to the caller code. Line 113: /// time-out and cancel itself. However, it is usual that the coordinator will initiate > i guess that extra complication is necessary because the non-existence of a Exactly. Line 124: /// The sender has a default timeout of 2 minutes for TransmitData() calls. If the timeout > why so long? The former case is detected in almost all cases. If the receiver has shown up, and been closed, the sender will find that in the closed stream cache. Line 131: /// immediately. Both of these cases are designed so that the sender can prepare a new > in the queued case, it seems like the response should go out when the queue In the queued batch case, we could look to see if the queue had any more capacity before responding to the sender. However, it's hard to know how much capacity a sender needs for its next batch (as batches with strings can vary in size a lot). This strategy is optimistic - if the receiver is able to accept a batch, we presume that it can accept the next one as well. In my tests, this worked better than pessimistically pausing senders until we knew for sure there was room for their particular batch. PS3, Line 135: idea > I would rather use the word 'assumption' here. Done Line 137: /// notification should not exceed 60s and so the sender will not time out. > that also sounds complicated. The sender needs a timeout, after which it will fail, and so the receiver has to try to respond within that timeout. That's all that's going on here - picking a timeout and then picking a response time, modulo standard distributed systems issues, that is very likely to be less than that timeout. This avoids false negatives. PS3, Line 212: TRANSMIT_DATA_TIMEOUT_SECONDS > I think there needs to be some comment clearly distinguishing between this I removed this because it was so similar to datastream_sender_timeout_ms that it made sense to just use the flag for both the initial timeout, and the row-batch processing timeout. Line 245: void AddData(const TUniqueId& fragment_instance_id, TransmitDataCtx&& payload); > in other words, pass in a pointer and stipulate that AddData owns it. I feel that rvalues do make ownership explicit. You can't pass a non-temporary rvalue without move(X), which makes it really clear. I think it's better to make it clear from the code (you cannot pass a TransmitDataCtx without relinquishing ownership) than it is to use the comments to convey a convention. http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.cc File be/src/runtime/data-stream-recvr.cc: PS3, Line 129: pending_senders_ > Maybe this could be future work, but I foresee a need to cap this at some n Hm - why do you think failing the query's a good idea? If the receiver is slow relative to the sender, this queue will grow, but that's not an error. PS3, Line 137: SpinLock > Isn't this a rather large amount of work to have under a spinlock? Although it's quite a few lines, I don't think it's that much work. What operation looks expensive to you? PS3, Line 191: // num_remaining_senders_ could be 0 because an AddBatch() can arrive *after* a : // EndDataStream() RPC for the same sender, due to asynchrony on the sender side (the : // sender gets closed or cancelled, but doesn't wait for the oustanding TransmitData() : // to complete before trying to close the channel). > If this is the only case where num_remaining_senders_ can be 0, then is the We have to clean up the RPC state and responding seems a good way to do that. http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/data-stream-recvr.h File be/src/runtime/data-stream-recvr.h: PS3, Line 174: good_bytes_received_counter_ > bytes_accepted_counter_ ? Done http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/runtime/descriptors.h File be/src/runtime/descriptors.h: Line 546: /// Serialize to the row_tuples field of a RowBatchPb. > something called ToProto should materialize a message that corresponds to t Done http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-internal-service.cc File be/src/service/impala-internal-service.cc: Line 80: ExecEnv::GetInstance()->stream_mgr()->AddData(finst_id, move(payload)); > This is a tad bit confusing. Why is there not a necessity to have a Respond AddData() will always respond to the RPC, but may do so asynchronously, so we can't use a return value here. If AddData() is not called, we have to respond to the RPC right here to make sure it doesn't hang on the client. I added some comments. PS3, Line 99: // TODO: Check return > What he said. Done Line 118: context->GetInboundSidecar(filter.header.directory_sidecar_idx(), &filter.directory); > Same here, check return status. Done Line 155: context->RespondSuccess(); > No need to return status.SetTStatus(&return_val) ? Good catch, done. http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala-server.cc File be/src/service/impala-server.cc: PS3, Line 1925: move > Include what you use? <utility> Done http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impala_internal_service.proto File be/src/service/impala_internal_service.proto: Line 1: // Licensed to the Apache Software Foundation (ASF) under one > let's move all .proto files into common/proto, that'll make it a lot easier The tooling is set up so that proto files fit more neatly in the existing source directories, and during development I've found it convenient to have the generated files in the same directory as other source files. I'd rather keep them in the source dirs - finding them is not hard. Line 32: // Tuples for all rows > incomprehensible comment. Do you mean like this: message TupleId { int32 id = 1; } ? If so I don't quite see the benefit vs the extra verbosity. Line 46: // Of type CatalogObjects.THdfsCompression (TODO(KRPC): native enum) > why not use the enum? We use THdfsCompression here because that's used elsewhere in the code where it's harder to change the serialization format (i.e. in Java). I could set up a mirror compression type in protobufs that's used for this field specifically, but that would be brittle (got to make sure the translation is done accurately from thrift<->proto and that no cases are missed). I think it's better to wait until we can use a proto enum everywhere. Line 50: message TransmitDataRequestPb { > you left out the service version (example: ImpalaInternalServiceVersion in I think there are some details that are underspecified in our current implementation that need some discussion - for example, how should version mismatches be indicated to the client? KRPC has an out-of-band error mechanism that we can use. Alternatively, we can add versions to response objects, with the convention that if the replied version != the requested version, no fields will be set and the RPC should fail. In Thrift only the request objects have a version field. What should we do with messages that aren't parameters? Do they have version fields? Otherwise, how should they be versioned? (Say we want to add a field to StatusPb). Do we have StatusPbV1 etc? I think this warrants a bit more discussion. I've changed all the fields to 'optional' (that's the only possibility in protobuf v3, out of interest). I think that gives us some future-proofing while we get the details nailed down, since the absence of a protocol marker can be interpreted as V0. Line 115: service ExecControlService { > why not put the services in separate .proto files Done Line 131: service DataStreamService { > would it make sense to have separate patches for the two services? it feels I've started this, but it's a bit time-consuming moving thousands of lines of changes between commits. For now, please review the data stream changes, and eventually the control svc changes will move into a different patch. Line 140: // Called by the coordinator to deliver global runtime filters to fragment > i consider the filters to be control structures. why wouldn't they be in ex I don't think they are control structures. They have large payloads and contain tuple data. http://gerrit.cloudera.org:8080/#/c/5888/3/be/src/service/impalad-main.cc File be/src/service/impalad-main.cc: PS3, Line 79: exec_env->Init(); > Need to check returned status and fail if necessary. Done -- To view, visit http://gerrit.cloudera.org:8080/5888 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I95229290566a8ccffd80ed2d74c1c57cf1479238 Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson <[email protected]> Gerrit-Reviewer: Anonymous Coward #168 Gerrit-Reviewer: Henry Robinson <[email protected]> Gerrit-Reviewer: Marcel Kornacker <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-HasComments: Yes
