This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a20977a5c0b37d20aab13bb441756f887a2d1c59 Author: Joe McDonnell <joemcdonn...@cloudera.com> AuthorDate: Thu Jun 6 13:18:13 2019 -0700 IMPALA-8630: Hash the full path when calculating consistent remote placement Consistent remote placement currently uses the relative filename within a partition for the consistent hash. If the relative filenames for different partitions have a simple naming scheme, then multiple partitions may have files of the same name. This is true for some tables written by Hive (e.g. in our minicluster the tpcds.store_sales has this problem). This can lead to unbalanced placement of remote ranges. This adds a partition_path_hash to the THdfsFileSplit and THdfsFileSplitGeneratorSpec, calculated in the frontend (which has all of the partition information). The scheduler hashes this in addition to the relative path. Testing: - Added several new scheduler tests that verify the consistent remote scheduling sees blocks with different relative paths, partition paths, or offsets as distinct. - Ran core tests Change-Id: I46c739fc31af539af2b3509e2a161f4e29f44d7b Reviewed-on: http://gerrit.cloudera.org:8080/13545 Reviewed-by: Joe McDonnell <joemcdonn...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/scheduling/scheduler-test-util.cc | 134 +++++++++++++++-- be/src/scheduling/scheduler-test-util.h | 60 +++++++- be/src/scheduling/scheduler-test.cc | 165 +++++++++++++++++---- be/src/scheduling/scheduler.cc | 11 +- common/thrift/PlanNodes.thrift | 9 ++ .../org/apache/impala/planner/HdfsScanNode.java | 5 +- .../org/apache/impala/planner/ExplainTest.java | 1 + 7 files changed, 331 insertions(+), 54 deletions(-) diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 314b32a..1dd1b8c 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -25,6 +25,7 @@ #include "scheduling/cluster-membership-mgr.h" #include "scheduling/cluster-membership-test-util.h" #include "scheduling/scheduler.h" +#include "util/hash-util.h" using namespace impala; using namespace impala::test; @@ -32,6 +33,25 @@ using namespace org::apache::impala::fb; DECLARE_int32(krpc_port); +/// Make the BlockNamingPolicy enum easy to print. Must be declared in impala::test +namespace impala { +namespace test { +std::ostream& operator<<(std::ostream& os, const BlockNamingPolicy& naming_policy) +{ + switch(naming_policy) { + case BlockNamingPolicy::UNPARTITIONED: + os << "UNPARTITIONED"; break; + case BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME: + os << "PARTITIONED_SINGLE_FILENAME"; break; + case BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES: + os << "PARTITIONED_UNIQUE_FILENAMES"; break; + default: os.setstate(std::ios_base::failbit); + } + return os; +} +} +} + /// Sample 'n' elements without replacement from the set [0..N-1]. /// This is an implementation of "Algorithm R" by J. Vitter. void SampleN(int n, int N, vector<int>* out) { @@ -115,6 +135,15 @@ void Cluster::GetBackendAddress(int host_idx, TNetworkAddress* addr) const { addr->port = hosts_[host_idx].be_port; } +Cluster Cluster::CreateRemoteCluster(int num_impala_nodes, int num_data_nodes) { + Cluster cluster; + // Set of Impala hosts + cluster.AddHosts(num_impala_nodes, true, false); + // Set of datanodes + cluster.AddHosts(num_data_nodes, false, true); + return cluster; +} + const vector<int>& Cluster::datanode_with_backend_host_idxs() const { return datanode_with_backend_host_idxs_; } @@ -123,6 +152,16 @@ const vector<int>& Cluster::datanode_only_host_idxs() const { return datanode_only_host_idxs_; } +void Schema::AddEmptyTable( + const TableName& table_name, BlockNamingPolicy naming_policy) { + DCHECK(tables_.find(table_name) == tables_.end()); + // Create table + Table table; + table.naming_policy = naming_policy; + // Insert table + tables_.emplace(table_name, table); +} + void Schema::AddSingleBlockTable( const TableName& table_name, const vector<int>& non_cached_replica_host_idxs) { AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {}); @@ -154,14 +193,17 @@ void Schema::AddSingleBlockTable(const TableName& table_name, void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks, ReplicaPlacement replica_placement, int num_replicas) { - AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0); + AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0, + BlockNamingPolicy::UNPARTITIONED); } void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks, - ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) { + ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas, + BlockNamingPolicy naming_policy) { DCHECK_GT(num_replicas, 0); DCHECK(num_cached_replicas <= num_replicas); Table table; + table.naming_policy = naming_policy; for (int i = 0; i < num_blocks; ++i) { Block block; vector<int>& replica_idxs = block.replica_host_idxs; @@ -238,25 +280,28 @@ void Plan::AddTableScan(const TableName& table_name) { for (int i = 0; i < blocks.size(); ++i) { const Block& block = blocks[i]; TScanRangeLocationList scan_range_locations; - BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations); + BuildTScanRangeLocationList(table_name, block, i, table.naming_policy, + &scan_range_locations); scan_range_specs_.concrete_ranges.push_back(scan_range_locations); } const vector<FileSplitGeneratorSpec>& specs = table.specs; for (int i = 0; i < specs.size(); ++i) { const FileSplitGeneratorSpec& file_spec = specs[i]; TFileSplitGeneratorSpec spec; - BuildScanRangeSpec(table_name, file_spec, i, &spec); + BuildScanRangeSpec(table_name, file_spec, i, table.naming_policy, &spec); scan_range_specs_.split_specs.push_back(spec); } } void Plan::BuildTScanRangeLocationList(const TableName& table_name, const Block& block, - int block_idx, TScanRangeLocationList* scan_range_locations) { + int block_idx, BlockNamingPolicy naming_policy, + TScanRangeLocationList* scan_range_locations) { const vector<int>& replica_idxs = block.replica_host_idxs; const vector<bool>& is_cached = block.replica_host_idx_is_cached; DCHECK_EQ(replica_idxs.size(), is_cached.size()); int num_replicas = replica_idxs.size(); - BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range); + BuildScanRange(table_name, block, block_idx, naming_policy, + &scan_range_locations->scan_range); scan_range_locations->locations.resize(num_replicas); for (int i = 0; i < num_replicas; ++i) { TScanRangeLocation& location = scan_range_locations->locations[i]; @@ -265,16 +310,68 @@ void Plan::BuildTScanRangeLocationList(const TableName& table_name, const Block& } } +void Plan::GetBlockPaths(const TableName& table_name, bool is_spec, int index, + BlockNamingPolicy naming_policy, string* relative_path, int64_t* partition_id, + string* partition_path) { + // For debugging, it is useful to differentiate between Blocks and + // FileSplitGeneratorSpecs. + string spec_or_block = is_spec ? "_spec_" : "_block_"; + + switch (naming_policy) { + case BlockNamingPolicy::UNPARTITIONED: + // For unpartitioned tables, use unique file names and set partition_id = 0 + // Encoding the table name and index in the file helps debugging. + // For example, an unpartitioned table may contain two files with paths like: + // /warehouse/{table_name}/file1.csv + // /warehouse/{table_name}/file2.csv + *relative_path = table_name + spec_or_block + std::to_string(index); + *partition_id = 0; + *partition_path = "/warehouse/" + table_name; + break; + case BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME: + // For partitioned tables with simple names, use a simple file name, but vary the + // partition id and partition_path by using the index as the partition_id and as + // part of the partition_path. + // For example, a partitioned table with two partitions might have paths like: + // /warehouse/{table_name}/year=2010/000001_0 + // /warehouse/{table_name}/year=2009/000001_0 + *relative_path = "000001_0"; + *partition_id = index; + *partition_path = "/warehouse/" + table_name + "/part=" + std::to_string(index); + break; + case BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES: + // For partitioned tables with unique names, the file name, partition_id, and + // partition_path all incorporate the index. + // For example, a partitioned table with two partitions might have paths like: + // /warehouse/{table_name}/year=2010/6541856e3fb0583d_654267678_data.0.parq + // /warehouse/{table_name}/year=2009/6541856e3fb0583d_627636719_data.0.parq + *relative_path = table_name + spec_or_block + std::to_string(index); + *partition_id = index; + *partition_path = "/warehouse/" + table_name + "/part=" + std::to_string(index); + break; + default: + DCHECK(false) << "Invalid naming_policy"; + } +} + void Plan::BuildScanRange(const TableName& table_name, const Block& block, int block_idx, - TScanRange* scan_range) { + BlockNamingPolicy naming_policy, TScanRange* scan_range) { // Initialize locations.scan_range correctly. THdfsFileSplit file_split; - // 'length' is the only member considered by the scheduler. + // 'length' is the only member considered by the scheduler for scheduling non-remote + // blocks. file_split.length = block.length; - // Encoding the table name and block index in the file helps debugging. - file_split.relative_path = table_name + "_block_" + std::to_string(block_idx); + + // Consistent remote scheduling considers the partition_path_hash, relative_path, + // and offset when scheduling blocks. + string partition_path; + GetBlockPaths(table_name, false, block_idx, naming_policy, &file_split.relative_path, + &file_split.partition_id, &partition_path); + file_split.partition_path_hash = + static_cast<int32_t>(HashUtil::Hash(partition_path.data(), + partition_path.length(), 0)); file_split.offset = 0; - file_split.partition_id = 0; + // For now, we model each file by a single block. file_split.file_length = block.length; file_split.file_compression = THdfsCompression::NONE; @@ -283,13 +380,19 @@ void Plan::BuildScanRange(const TableName& table_name, const Block& block, int b } void Plan::BuildScanRangeSpec(const TableName& table_name, - const FileSplitGeneratorSpec& spec, int spec_idx, + const FileSplitGeneratorSpec& spec, int spec_idx, BlockNamingPolicy naming_policy, TFileSplitGeneratorSpec* thrift_spec) { THdfsFileDesc thrift_file; + string relative_path; + int64_t partition_id; + string partition_path; + GetBlockPaths(table_name, true, spec_idx, naming_policy, &relative_path, + &partition_id, &partition_path); + flatbuffers::FlatBufferBuilder fb_builder; auto rel_path = - fb_builder.CreateString(table_name + "_spec_" + std::to_string(spec_idx)); + fb_builder.CreateString(relative_path); auto fb_file_desc = CreateFbFileDesc(fb_builder, rel_path, spec.length); fb_builder.Finish(fb_file_desc); @@ -297,10 +400,13 @@ void Plan::BuildScanRangeSpec(const TableName& table_name, reinterpret_cast<const char*>(fb_builder.GetBufferPointer()), fb_builder.GetSize()); thrift_file.__set_file_desc_data(buffer); - thrift_spec->__set_partition_id(0); + thrift_spec->__set_partition_id(partition_id); thrift_spec->__set_file_desc(thrift_file); thrift_spec->__set_max_block_size(spec.block_size); thrift_spec->__set_is_splittable(spec.is_splittable); + int32_t partition_path_hash = static_cast<int32_t>(HashUtil::Hash(partition_path.data(), + partition_path.length(), 0)); + thrift_spec->__set_partition_path_hash(partition_path_hash); } int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) { diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h index 30c0f76..4005927 100644 --- a/be/src/scheduling/scheduler-test-util.h +++ b/be/src/scheduling/scheduler-test-util.h @@ -87,6 +87,24 @@ enum class ReplicaPlacement { REMOTE_ONLY, }; +/// Blocks and FileSplitGeneratorSpecs mimic real files, and for some parts of scheduling +/// (e.g. consistent remote scheduling) the actual file names and partition paths matter. +/// When defining a table, you can specify a block naming policy to control this. +/// - UNPARTITIONED means that the partition paths and partition ids are constant, but +/// the file names are unique. +/// - PARTITIONED_SINGLE_FILENAME means that the partition paths and partition ids are +/// unique, but the file name inside the partition is a single constant. +/// - PARTITIONED_UNIQUE_FILENAMES means that the partition paths, partition ids, and +/// the file names inside the partition. +/// These policies are mostly irrelevent for single block tables or tables with local +/// scheduling, so the default policy is UNPARTITIONED. +enum class BlockNamingPolicy { + UNPARTITIONED, + PARTITIONED_SINGLE_FILENAME, + PARTITIONED_UNIQUE_FILENAMES, +}; +std::ostream& operator<<(std::ostream& os, const BlockNamingPolicy& naming_policy); + /// Host model. Each host can have either a backend, a datanode, or both. To specify that /// a host should not act as a backend or datanode specify '-1' as the respective port. /// A host with a backend is always a coordinator but it may not be an executor. @@ -122,6 +140,13 @@ class Cluster { const std::vector<Host>& hosts() const { return hosts_; } int NumHosts() const { return hosts_.size(); } + /// Helper function to create a cluster with Impala nodes separate from the datanodes. + /// This is primarily used for consistent remote scheduling tests. This places + /// the impalad nodes first, then the data nodes. Impalad nodes will have indices + /// in the range [0, num_impala_nodes-1] and data nodes will have indices in the + /// range [num_impala_nodes, num_impala_nodes+num_data_nodes-1]. + static Cluster CreateRemoteCluster(int num_impala_nodes, int num_data_nodes); + /// These methods return lists of host indexes, grouped by their type, which can be used /// to draw samples of random sets of hosts. /// TODO: Think of a nicer abstraction to expose this information. @@ -193,10 +218,14 @@ struct FileSplitGeneratorSpec { static const int64_t DEFAULT_BLOCK_SIZE; }; -/// A table models multiple partitions, some of which represent their files explicitly -/// with Blocks or with FileSplitGeneratorSpecs. A table can consist of both -/// representations. +/// A table models multiple files. Each file can be represented explicitly with a Block +/// or with a FileSplitGeneratorSpecs. A table can consist of files with both +/// representations. The table can specify a BlockNamingPolicy, which tailors the +/// file name and path for scan ranges to simulate partitioned vs unpartitioned tables. +/// Consistent remote scheduling depends on the file paths, but the file names do not +/// impact other aspects of scheduling. struct Table { + BlockNamingPolicy naming_policy = BlockNamingPolicy::UNPARTITIONED; std::vector<Block> blocks; std::vector<FileSplitGeneratorSpec> specs; }; @@ -205,6 +234,10 @@ class Schema { public: Schema(const Cluster& cluster) : cluster_(cluster) {} + /// Add a table with no blocks. This is useful for tables with a custom naming + /// policy that later add FileSplitGeneratorSpecs. + void AddEmptyTable(const TableName& table_name, BlockNamingPolicy naming_policy); + /// Add a table consisting of a single block to the schema with explicitly specified /// replica indexes for non-cached replicas and without any cached replicas. Replica /// indexes must refer to hosts in cluster_.hosts() by index. @@ -226,9 +259,11 @@ class Schema { /// Add a table to the schema, selecting replica hosts according to the given replica /// placement preference. After replica selection has been done, 'num_cached_replicas' - /// of them are marked as cached. + /// of them are marked as cached. The table uses the specified 'naming_policy' for + /// its blocks. void AddMultiBlockTable(const TableName& table_name, int num_blocks, - ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas); + ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas, + BlockNamingPolicy naming_policy); /// Adds FileSplitGeneratorSpecs to table named 'table_name'. If the table does not /// exist, creates a new table. Otherwise, adds the 'specs' to an existing table. @@ -293,15 +328,24 @@ class Plan { /// Initialize a TScanRangeLocationList object in place. void BuildTScanRangeLocationList(const TableName& table_name, const Block& block, - int block_idx, TScanRangeLocationList* scan_range_locations); + int block_idx, BlockNamingPolicy naming_policy, + TScanRangeLocationList* scan_range_locations); + + /// Builds appropriate paths for a Block given the table name and block naming + /// policy. + void GetBlockPaths(const TableName& table_name, bool is_spec, int index, + BlockNamingPolicy naming_policy, string* relative_path, int64_t* partition_id, + string* partition_path); /// Initializes a scan range for a Block. void BuildScanRange( - const TableName& table_name, const Block& block, int block_idx, TScanRange* range); + const TableName& table_name, const Block& block, int block_idx, + BlockNamingPolicy naming_policy, TScanRange* range); /// Initializes a scan range for a FileSplitGeneratorSpec. void BuildScanRangeSpec(const TableName& table_name, const FileSplitGeneratorSpec& spec, - int spec_idx, TFileSplitGeneratorSpec* thrift_spec); + int spec_idx, BlockNamingPolicy naming_policy, + TFileSplitGeneratorSpec* thrift_spec); /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not /// been added to the plan before, it will add it to 'referenced_datanodes_' and return diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index f7d2e4e..e038034 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -31,6 +31,10 @@ class SchedulerTest : public testing::Test { SchedulerTest() { srand(0); } }; +static const vector<BlockNamingPolicy> BLOCK_NAMING_POLICIES( + {BlockNamingPolicy::UNPARTITIONED, BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME, + BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES}); + /// Smoke test to schedule a single table with a single scan range over a single host. TEST_F(SchedulerTest, SingleHostSingleFile) { Cluster cluster; @@ -214,16 +218,14 @@ TEST_F(SchedulerTest, NoRemoteExecutorCandidates) { /// nodes for varying values of num_remote_executor_candidates. This includes cases /// where the num_remote_executor_candidates exceeds the number of Impala executors. TEST_F(SchedulerTest, RemoteExecutorCandidates) { - Cluster cluster; int num_data_nodes = 3; int num_impala_nodes = 5; - // Set of datanodes - cluster.AddHosts(num_data_nodes, false, true); - // Set of Impala hosts - cluster.AddHosts(num_impala_nodes, true, false); + Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes); Schema schema(cluster); - schema.AddSingleBlockTable("T1", {0, 1, 2}); + // CreateRemoteCluster places the Impala nodes first, so the data nodes have indices + // of 5, 6, and 7. + schema.AddSingleBlockTable("T1", {5, 6, 7}); // Test a range of number of remote executor candidates, including cases where the // number of remote executor candidates exceeds the number of Impala nodes. @@ -254,18 +256,116 @@ TEST_F(SchedulerTest, RemoteExecutorCandidates) { } } +/// Helper function to verify that two things are treated as distinct for consistent +/// remote placement. The input 'schema' should be created with a Cluster initialized +/// by Cluster::CreateRemoteCluster() with 50 impalads and 3 data nodes. It should +/// have a single table named "T" that contains two schedulable entities (blocks, specs, +/// etc) with size Block::DEFAULT_BLOCK_SIZE that are expected to be distinct. It runs +/// the scheduler 100 times with random replica set to true and verifies that the number +/// of distinct backends is in the right range. If two things are distinct and each can +/// be scheduled on up to 'num_candidates' distinct backends, then the number of distinct +/// backends should be in the range ['num_candidates' + 1, 2 * 'num_candidates']. +/// The probability of completely overlapping by chance is extremely low and +/// SchedulerTests use srand(0) to be deterministic, so this test should only fail if +/// the entities are no longer considered distinct. +void RemotePlacementVerifyDistinct(const Schema& schema, int num_candidates) { + ASSERT_EQ(schema.cluster().NumHosts(), 53); + Plan plan(schema); + plan.AddTableScan("T"); + plan.SetRandomReplica(true); + plan.SetNumRemoteExecutorCandidates(num_candidates); + + Result result(plan); + SchedulerWrapper scheduler(plan); + for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result)); + + // This is not intended to be used with a larger number of candidates + ASSERT_LE(num_candidates, 10); + int min_distinct_backends = num_candidates + 1; + int max_distinct_backends = 2 * num_candidates; + ASSERT_EQ(100, result.NumAssignments()); + EXPECT_EQ(200, result.NumTotalAssignments()); + EXPECT_EQ(200 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes()); + EXPECT_GE(result.NumDistinctBackends(), min_distinct_backends); + EXPECT_LE(result.NumDistinctBackends(), max_distinct_backends); +} + +/// Test that consistent remote placement schedules distinct blocks differently +TEST_F(SchedulerTest, RemotePlacementBlocksDistinct) { + int num_data_nodes = 3; + int num_impala_nodes = 50; + Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes); + + // Two blocks (which translate to actual files for a table) should hash differently + // and result in different remote placement. This verifies various combinations + // corresponding to how files are named. + for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) { + SCOPED_TRACE(naming_policy); + Schema schema(cluster); + int num_blocks = 2; + int num_remote_candidates = 3; + schema.AddMultiBlockTable("T", num_blocks, ReplicaPlacement::RANDOM, + num_remote_candidates, 0, naming_policy); + RemotePlacementVerifyDistinct(schema, num_remote_candidates); + } +} + +/// Test that consistent remote placement schedules distinct file split generator specs +/// differently +TEST_F(SchedulerTest, RemotePlacementSpecsDistinct) { + int num_data_nodes = 3; + int num_impala_nodes = 50; + Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes); + + // Two specs (which translate to actual files for a table) should hash differently + // and result in different remote placement. This verifies that is true for all + // the naming policies. + for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) { + SCOPED_TRACE(naming_policy); + Schema schema(cluster); + int num_remote_candidates = 3; + // Add the table with the appropriate naming policy, but without adding any blocks. + schema.AddEmptyTable("T", naming_policy); + // Add two splits with one block each (i.e. the total size of the split is the + // block size). + schema.AddFileSplitGeneratorSpecs("T", + {{Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false}, + {Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false}}); + RemotePlacementVerifyDistinct(schema, num_remote_candidates); + } +} + +/// Tests that consistent remote placement schedules blocks with distinct offsets +/// differently +TEST_F(SchedulerTest, RemotePlacementOffsetsDistinct) { + int num_data_nodes = 3; + int num_impala_nodes = 50; + Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes); + + // A FileSplitGeneratorSpec that generates two scan ranges should hash the two scan + // ranges differently due to different offsets. This is true regardless of the + // naming_policy (which should not impact the outcome). + for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) { + SCOPED_TRACE(naming_policy); + Schema schema(cluster); + int num_remote_candidates = 3; + // Add the table with the appropriate naming policy, but without adding any blocks. + schema.AddEmptyTable("T", naming_policy); + // Add a splittable spec that will be split into two scan ranges each of + // the default block size. + schema.AddFileSplitGeneratorSpecs("T", + {{2 * Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, true}}); + RemotePlacementVerifyDistinct(schema, num_remote_candidates); + } +} + /// Verify basic consistency of remote executor candidates. Specifically, it schedules -/// a set of blocks, then removes an executor that did not have any blocks assigned to -/// it, and verifies that rerunning the scheduling results in the same assignments. +/// a set of blocks, then removes some executors that did not have any blocks assigned to +/// them, and verifies that rerunning the scheduling results in the same assignments. TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) { - Cluster cluster; int num_data_nodes = 3; int num_impala_nodes = 50; - - // Set of Impala hosts - cluster.AddHosts(num_impala_nodes, true, false); - // Set of datanodes - cluster.AddHosts(num_data_nodes, false, true); + Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, num_data_nodes); // Replica placement is unimportant for this test. All blocks will be on // all datanodes, but Impala is runnning remotely. @@ -275,28 +375,40 @@ TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) { Plan plan(schema); plan.AddTableScan("T1"); plan.SetRandomReplica(false); - plan.SetNumRemoteExecutorCandidates(3); + // TODO: Consistent scheduling is only completely consistent to removing an unused + // node when the number of executor candidates is 1. See IMPALA-8677. + plan.SetNumRemoteExecutorCandidates(1); Result result_base(plan); SchedulerWrapper scheduler(plan); ASSERT_OK(scheduler.Compute(&result_base)); EXPECT_EQ(25, result_base.NumTotalAssignments()); EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_base.NumTotalAssignedBytes()); + EXPECT_GT(result_base.NumDistinctBackends(), 3); - // There are 25 blocks and 50 Impala hosts. There will be Impala hosts without - // any assigned bytes. Removing one of them should not change the outcome. - Result result_empty_removed(plan); - // Find an Impala host that was not assigned any bytes and remove it - bool removed_one = false; + // There are 25 blocks and 50 Impala hosts. There will be at least 25 Impala hosts + // without any assigned bytes. Removing some of them should not change the outcome. + // Generate a list of the hosts without bytes assigned. + vector<int> zerobyte_indices; for (int i = 0; i < num_impala_nodes; ++i) { if (result_base.NumTotalAssignedBytes(i) == 0) { - scheduler.RemoveBackend(cluster.hosts()[i]); - removed_one = true; - break; + zerobyte_indices.push_back(i); } } - ASSERT_TRUE(removed_one); - // Rerun the scheduling with the node removed. + EXPECT_GE(zerobyte_indices.size(), 25); + + // Remove 5 nodes with zero bytes by picking several indices in the list of + // nodes with zero bytes and removing the corresponding backends. + vector<int> zerobyte_indices_to_remove({3, 7, 12, 15, 19}); + int num_removed = 0; + for (int index_to_remove : zerobyte_indices_to_remove) { + int node_index = zerobyte_indices[index_to_remove]; + scheduler.RemoveBackend(cluster.hosts()[node_index]); + num_removed++; + } + ASSERT_EQ(num_removed, 5); + // Rerun the scheduling with the nodes removed. + Result result_empty_removed(plan); ASSERT_OK(scheduler.Compute(&result_empty_removed)); EXPECT_EQ(25, result_empty_removed.NumTotalAssignments()); EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_empty_removed.NumTotalAssignedBytes()); @@ -304,7 +416,8 @@ TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) { // Verify that the outcome is identical. for (int i = 0; i < num_impala_nodes; ++i) { EXPECT_EQ(result_base.NumRemoteAssignedBytes(i), - result_empty_removed.NumRemoteAssignedBytes(i)); + result_empty_removed.NumRemoteAssignedBytes(i)) + << "Mismatch at index " << std::to_string(i); } } diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index d4183e3..bb64f0c 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -109,6 +109,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec hdfs_scan_range.__set_offset(scan_range_offset); hdfs_scan_range.__set_partition_id(spec.partition_id); hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec()); + hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash); TScanRange scan_range; scan_range.__set_hdfs_file_split(hdfs_scan_range); TScanRangeLocationList scan_range_list; @@ -794,10 +795,12 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates( // than 'num_candidates'. set<IpAddr> distinct_backends; // Generate multiple hashes of the file split by using the hash as a seed to a PRNG. - // Note: This hashes both the filename and the offset to allow very large files - // to be spread across more executors. - uint32_t hash = HashUtil::Hash(hdfs_file_split->relative_path.data(), - hdfs_file_split->relative_path.length(), 0); + // Note: The hash includes the partition path hash, the filename (relative to the + // partition directory), and the offset. The offset is used to allow very large files + // that have multiple splits to be spread across more executors. + uint32_t hash = static_cast<uint32_t>(hdfs_file_split->partition_path_hash); + hash = HashUtil::Hash(hdfs_file_split->relative_path.data(), + hdfs_file_split->relative_path.length(), hash); hash = HashUtil::Hash(&hdfs_file_split->offset, sizeof(hdfs_file_split->offset), hash); pcg32 prng(hash); // To avoid any problem scenarios, limit the total number of iterations diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index f188c72..eb780e2 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -191,6 +191,12 @@ struct THdfsFileSplit { // whether this file is erasure-coded 8: required bool is_erasure_coded + + // Hash of the partition's path. This must be hashed with a hash algorithm that is + // consistent across different processes and machines. This is currently using + // Java's String.hashCode(), which is consistent. For testing purposes, this can use + // any consistent hash. + 9: required i32 partition_path_hash } // key range for single THBaseScanNode @@ -223,6 +229,9 @@ struct TFileSplitGeneratorSpec { // ID of partition within the THdfsTable associated with this scan node. 4: required i64 partition_id + + // Hash of the partition path + 5: required i32 partition_path_hash } // Specification of an individual data range which is held in its entirety diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 4f084f2..a9cf603 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -985,7 +985,8 @@ public class HdfsScanNode extends ScanNode { boolean splittable = partition.getFileFormat().isSplittable( HdfsCompression.fromFileName(fileDesc.getRelativePath())); TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec( - fileDesc.toThrift(), maxBlockSize, splittable, partition.getId()); + fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(), + partition.getLocation().hashCode()); scanRangeSpecs_.addToSplit_specs(splitSpec); long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength()); if (splittable) { @@ -1051,7 +1052,7 @@ public class HdfsScanNode extends ScanNode { scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getRelativePath(), currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(), fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(), - fileDesc.getIsEc())); + fileDesc.getIsEc(), partition.getLocation().hashCode())); TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList(); scanRangeLocations.scan_range = scanRange; scanRangeLocations.locations = locations; diff --git a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java index 75c2730..a19d8cc 100644 --- a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java +++ b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java @@ -161,6 +161,7 @@ public class ExplainTest extends FrontendTestBase { when(mockHdfsPartition.getLocationPath()) .thenReturn(new org.apache.hadoop.fs.Path(path)); + when(mockHdfsPartition.getLocation()).thenReturn(path); when(mockHdfsPartition.getFileDescriptors()).thenReturn(mockFilesDescs); when(mockHdfsPartition.getFileFormat()).thenReturn(HdfsFileFormat.PARQUET); when(mockHdfsPartition.getFsType()).thenReturn(fsType);