Joe McDonnell has posted comments on this change. ( http://gerrit.cloudera.org:8080/12037 )
Change subject: IMPALA-7928: Consistent remote read scheduling ...................................................................... Patch Set 5: (55 comments) http://gerrit.cloudera.org:8080/#/c/12037/5//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/12037/5//COMMIT_MSG@12 PS5, Line 12: cache that is at the file level). > There is no other cache at the file level, so i think this parenthetical co Good point, fixed. http://gerrit.cloudera.org:8080/#/c/12037/5//COMMIT_MSG@15 PS5, Line 15: simluated > nit: typo Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/experiments/hash-ring-util.cc File be/src/experiments/hash-ring-util.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/experiments/hash-ring-util.cc@39 PS5, Line 39: hashspace > nit: hashspace or hash space (line below) Changed to "hash space". I double checked that I'm consistent across the rest. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/experiments/hash-ring-util.cc@44 PS5, Line 44: HashRingUtil > Can you think of a more descriptive name? HashRingConsistencyCheck or simil I changed this to HashRingDistributionCheck. Are you ok with the file name? http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.h File be/src/scheduling/backend-config.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.h@33 PS5, Line 33: /// hostnames to IP addresses. > Maybe mention the hash ring in the class comment, too? Added it to this comment. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.h@83 PS5, Line 83: HashRing backend_ip_hashring_; > nit: hash_ring? Or Hashring? Switched to backend_ip_hash_ring_ and went through the code to consistently use "hash ring" and avoid "hashring". http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.cc File be/src/scheduling/backend-config.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.cc@22 PS5, Line 22: // TODO: determine the appropriate number of replicas for the hash-ring > Are we going to do this, or leave for the future? Fine to leave; just check Changed this comment to indicate there has been some testing, but there is room for real world tuning. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/backend-config.cc@23 PS5, Line 23: static const uint32_t NUM_HASHRING_REPLICAS = 25; > How did you pick 25? Does this number have to have any special properties, Updated comment to indicate that 25 is not special. I picked 25 with hand tests looking at max/min ratio for a variety of replications and number of backends. Left a TODO indicating that we can tune this further. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring-test.cc File be/src/scheduling/hash-ring-test.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring-test.cc@31 PS5, Line 31: class HashRingTest : public ::testing::Test { > I don't see a test for GetNode(n) here. Something pretty basic should work. I added a test for GetNode() which tests overflow and reads at some random points. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring-test.cc@39 PS5, Line 39: EXPECT_EQ(hash_ring.GetTotalReplicas(), num_nodes * hash_ring.GetNumReplicas()); > This assumes no collisions? Yes, added a comment. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring-test.cc@42 PS5, Line 42: void VerifyTotalAllocation(const vector<TNetworkAddress>& addresses, > I think these methods could use a comment, some of them weren't clear to me I added comments to the Verify methods. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h File be/src/scheduling/hash-ring.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@31 PS5, Line 31: TNetworkAddress > For the purpose of scheduling we use IP addresses because whether a read is Read through this some more, and it makes sense to use IpAddr. Switched the code over. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@42 PS5, Line 42: and expects the caller to do its own hashing for the item. > Does this conflict with the TODO about "function pointer that generates a h In this case, I'm talking about the probe side that is calling GetNode(). The templating would be for the nodes that the hashring holds (TNetworkAddress in this case). Changed some of the wording to clarify that the templating would be for the node type. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@45 PS5, Line 45: /// node is remapped. This allows for bounded disruption. > Should we quantify the bound? Added that the disruption should be proportional to 1 / num nodes. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@56 PS5, Line 56: : num_replicas_(num_replicas) {} > assert that num_replicas >= 1? Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@58 PS5, Line 58: /// Copy constructor > Why do we need a copy constructor? We need it to copy the backend, eh? Yes, we make a copy of the BackendConfig for delta updates (Scheduler::UpdateMembership()). This copies the embedded HashRing. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@64 PS5, Line 64: for (auto it = hash_ring.node_hashmap_.begin(); > nit: you could use a range-based for loop, here and elsewhere. Changed this to a range-based for. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@64 PS5, Line 64: for (auto it = hash_ring.node_hashmap_.begin(); : it != hash_ring.node_hashmap_.end(); : it++) { : uint32_t hash_value = it->first; : NodeIterator old_node_it = it->second; : // Look up the equivalent node iterator in the new nodes_ set. : NodeIterator new_node_it = nodes_.find(*old_node_it); : DCHECK(new_node_it != nodes_.end()); : node_hashmap_.insert(std::pair<int32_t, NodeIterator>(hash_value, new_node_it)); : } > Should this be moved out of the header? Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@76 PS5, Line 76: HashRing(HashRing&&) = delete; > Deleting a ctor seems uncommon enough to warrant a comment. I added a comment. I'm deleting because I haven't tested it or implemented it. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@90 PS5, Line 90: /// value. > Add: The hash ring must not be empty. Added comment. When constructing Scheduler::AssignmentCtx, we do: DCHECK_GT(executor_config.NumBackends(), 0); At that point, we have our reference to the BackendConfig, and nothing will change on that. Before that, in Scheduler::ComputeScanRangeAssignments(), we do: if (executor_config.NumBackends() == 0 && !exec_at_coord) { return Status(TErrorCode::NO_REGISTERED_BACKENDS); } http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@97 PS5, Line 97: /// this TNetworkAddress. This is useful for examining how the hash space is balanced > Can you include in the comment how the portion is represented in the int64_ Updated the comment to add some description. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@99 PS5, Line 99: void GetDistributionMap(std::map<TNetworkAddress, uint64_t>& distribution_map) const; > Pass output parameter by pointer. Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@106 PS5, Line 106: std::set<TNetworkAddress> nodes_; > Should we do this in the backend config, possibly in a vector, and then jus I think doing some work to reduce our use of strings in the scheduler would be great. I think it needs much more care than what I'm doing here. It is easy for us to change this later to remove this set when we know that what we are storing is pointer-sized or smaller. Vector would need a complete rebuild of structures on node add/remove (which probably is cheap and fine), because indexes/iterators are not invalidated for a variety of operations. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@111 PS5, Line 111: /// of the two underlying TNetworkAddress's. Collisions should be rare, so this will > This looks like the birthday paradox and collisions might be more common th Two thoughts here: 1. As you note, it is very important whether colliding once causes colliding everywhere. In this upload, I change the code to just feed the initial hash into a prng and then sample from that. If two nodes hash to the same initial hash (which is fed into the prng), then we are out of luck. However, if they don't, then they have distinct prng seeds. As long as the prng has extra state beyond the output, then the extra state will cause them to have different sequences. I'm using pcg32, which has 32-bit output and 64-bit state. It is not simple to predict based on a single output value. 2. In the case that collisions are not correlated, as the probability of collision goes up (due to more nodes), the amount of the hash space that maps to each node goes down. In your example, there are 372 * 25 = 9300 nodes, which means that the portion of the hash space controlled by a single hash of a node should be about 1 / 9300. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.h@113 PS5, Line 113: std::map<uint32_t, NodeIterator> node_hashmap_; > It seems a bit confusing that a ordered map is called hash_map. Can you thi Good point, changed to hash_to_node_. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc File be/src/scheduling/hash-ring.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@57 PS5, Line 57: node_hashmap_[hash_val] = node_it; > If there's a collision, then another node hashed to the same hash_val, righ I'm now using the initial hash as a seed for a prng, which is pcg32. As commented elsewhere, if the initial seed is identical, we are unhappy. However, if two sequences with different seeds intersect, there isn't a strong reason to think that more of the sequences would overlap. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@78 PS5, Line 78: DCHECK_GT(num_removed, 0); > If we have just the right hash collisions, it's possible that num_removed = Yes, it is possible that when adding a node, every single add is a collision. Then, there would be no nodes to remove. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@83 PS5, Line 83: DCHECK_GT(node_hashmap_.size(), 0); > !node_hashmap.empty() Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@91 PS5, Line 91: return &(*node_it); > If node_it == node_hashmap_.end(), should you return nullptr here? Changed this to return nullptr when empty. The DCHECK moves to scheduler code. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@94 PS5, Line 94: void HashRing::GetDistributionMap(map<TNetworkAddress, uint64_t>& distribution_map) const { > line too long (91 > 90) Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@104 PS5, Line 104: TNetworkAddress addr = *(it->second); > nit: consistent naming, here vs ip_addr below Changed ip_addr below to addr. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@107 PS5, Line 107: if (distribution_map.find(addr) != distribution_map.end()) { : distribution_map[addr] += range; : } else { : distribution_map[addr] = range; : } > I think the map default-initializes when using [], so you can just do map[a Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/hash-ring.cc@115 PS5, Line 115: UINT_MAX > If you use (1<<32) here, then the two parts of the bucket that wraps from U Yes, I'm trying to decide which makes the code clearer. I increment range separately with a comment, but a statement with 1<<32 and a comment could be just as clear. I've left this as-is, but I don't really have a strong preference. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test-util.h File be/src/scheduling/scheduler-test-util.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test-util.h@271 PS5, Line 271: query_options_.num_simulated_remote_replicas = num; > nit: move to .cc file if it doesn't fit into a single line, like SetReplica Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test-util.cc File be/src/scheduling/scheduler-test-util.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test-util.cc@531 PS5, Line 531: return host.ip + ":" + std::to_string(host.be_port); > I'm surprised this is necessary, are the IP addresses generated by HostIdxT Good point, removed this change. I thought this fixed an issue I saw, but I ran tests without it and everything is fine. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test.cc File be/src/scheduling/scheduler-test.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test.cc@220 PS5, Line 220: for (int i = 0; i < num_data_nodes; ++i) cluster.AddHost(false, true); > There's also cluster.AddHosts which does the same as these two loops Good point, switched over. (Same thing in SimulatedRemoteReplicasConsistency) http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test.cc@229 PS5, Line 229: for (int32_t num_replicas = 1; num_replicas <= num_impala_nodes + 1; num_replicas++) { > nit: use int and pre-increment to stay consistent within the file. Fixed http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test.cc@248 PS5, Line 248: all blocks > The file has 3 blocks, isn't it all scan ranges that are assigned to the sa Oh, that's right. Fixed the comment. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler-test.cc@293 PS5, Line 293: } > you could assert that the number of backends is 1 less after the loop or th Added a check that RemoveBackend is called once. The number of backends is not really exposed by SchedulerWrapper or Scheduler. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.h File be/src/scheduling/scheduler.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.h@189 PS5, Line 189: simulated_remote_replicas > When I first read this it wasn't entirely clear to me from the comment what Switched to GetRemoteExecutorCandidates() http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.h@191 PS5, Line 191: /// the file name from 'hdfs_file_split' multiple times and look up the closest > Have you considered passing the offset into the hash, too? Would that incre That is a good idea. We want big files to get split up, and this shouldn't cause problems for Parquet. I'm changing this to pass in the offset. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.h@198 PS5, Line 198: int32_t num_remote_replicas, vector<IpAddr>& simulated_remote_replicas); > We usually pass output parameters by pointer Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@708 PS5, Line 708: num_simulated_remote_replicas, > nit: line wrapping Combined the last two lines. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@904 PS5, Line 904: const THdfsFileSplit* hdfs_file_split, int32_t num_remote_replicas, > I think we usually just use "int" instead of int32_t, here and elsewhere. Fixed http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@907 PS5, Line 907: DCHECK_EQ(simulated_remote_replicas.size(), 0); > You could call reserve(num_remote_replicas) on simulated_remote_replicas, t Based on a quick search, vector starts with capacity 10. num_simulated_remote_replicas has range [1,16] (and defaults to 3), so I think I'm going to skip this. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@914 PS5, Line 914: set<TNetworkAddress> distinct_backends; > Does the algorithm require the set to be ordered? No, so maybe an unordered_set would be faster. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@919 PS5, Line 919: uint32_t max_iterations = num_remote_replicas * 2; > We usually don't use unsigned types per convention unless required by some Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@920 PS5, Line 920: for (uint32_t i = 0; i < max_iterations; i++) { > I think we usually use pre-increment. Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@930 PS5, Line 930: for (auto it = distinct_backends.begin(); it != distinct_backends.end(); it++) { > nit: use a range based loop? Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/scheduling/scheduler.cc@931 PS5, Line 931: const TBackendDescriptor *be_desc = executors_config_.LookUpBackendDesc(*it); > nit: move space to right side of * Done http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/service/query-options.h File be/src/service/query-options.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/service/query-options.h@149 PS5, Line 149: TQueryOptionLevel::DEVELOPMENT)\ > I would prefer "advanced" since this change sets a value of 3 which will ch Changed this to advanced. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/util/hash-util.h File be/src/util/hash-util.h: http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/util/hash-util.h@282 PS5, Line 282: This is intended to be a slight improvement on that. > Can you mention the benefits of doing that in the comment? I killed off this code. http://gerrit.cloudera.org:8080/#/c/12037/5/be/src/util/hash-util.h@290 PS5, Line 290: class MultipleHashGenerator { > I suspect we can just use Rehash32to32 above. Killed off this code in favor of using the pcg pseudorandom number generator that we already have. http://gerrit.cloudera.org:8080/#/c/12037/5/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/12037/5/common/thrift/ImpalaService.thrift@355 PS5, Line 355: // The number of simulated replicas to use when scheduling remote HDFS ranges. > The word "simulated" feels unintuitive to me, how about NUM_REMOTE_READ_CAN Yes, reading through the code, it doesn't make much sense. Changed this to NUM_REMOTE_EXECUTOR_CANDIDATES. http://gerrit.cloudera.org:8080/#/c/12037/5/common/thrift/ImpalaService.thrift@357 PS5, Line 357: will > ... that can read ...? Done -- To view, visit http://gerrit.cloudera.org:8080/12037 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Icbf74088a8bd8c285ab7285ea3a01acd1bb53a45 Gerrit-Change-Number: 12037 Gerrit-PatchSet: 5 Gerrit-Owner: Joe McDonnell <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Joe McDonnell <[email protected]> Gerrit-Reviewer: Lars Volker <[email protected]> Gerrit-Reviewer: Michael Ho <[email protected]> Gerrit-Reviewer: Philip Zeyliger <[email protected]> Gerrit-Comment-Date: Tue, 05 Feb 2019 02:46:08 +0000 Gerrit-HasComments: Yes
