This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a20977a5c0b37d20aab13bb441756f887a2d1c59
Author: Joe McDonnell <joemcdonn...@cloudera.com>
AuthorDate: Thu Jun 6 13:18:13 2019 -0700

    IMPALA-8630: Hash the full path when calculating consistent remote placement
    
    Consistent remote placement currently uses the relative filename within
    a partition for the consistent hash. If the relative filenames for
    different partitions have a simple naming scheme, then multiple
    partitions may have files of the same name. This is true for some
    tables written by Hive (e.g. in our minicluster the tpcds.store_sales
    has this problem). This can lead to unbalanced placement of remote
    ranges.
    
    This adds a partition_path_hash to the THdfsFileSplit and
    THdfsFileSplitGeneratorSpec, calculated in the frontend (which has all of
    the partition information). The scheduler hashes this in addition to
    the relative path.
    
    Testing:
     - Added several new scheduler tests that verify the consistent remote
       scheduling sees blocks with different relative paths, partition paths,
       or offsets as distinct.
     - Ran core tests
    
    Change-Id: I46c739fc31af539af2b3509e2a161f4e29f44d7b
    Reviewed-on: http://gerrit.cloudera.org:8080/13545
    Reviewed-by: Joe McDonnell <joemcdonn...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/scheduling/scheduler-test-util.cc           | 134 +++++++++++++++--
 be/src/scheduling/scheduler-test-util.h            |  60 +++++++-
 be/src/scheduling/scheduler-test.cc                | 165 +++++++++++++++++----
 be/src/scheduling/scheduler.cc                     |  11 +-
 common/thrift/PlanNodes.thrift                     |   9 ++
 .../org/apache/impala/planner/HdfsScanNode.java    |   5 +-
 .../org/apache/impala/planner/ExplainTest.java     |   1 +
 7 files changed, 331 insertions(+), 54 deletions(-)

diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 314b32a..1dd1b8c 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -25,6 +25,7 @@
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/scheduler.h"
+#include "util/hash-util.h"
 
 using namespace impala;
 using namespace impala::test;
@@ -32,6 +33,25 @@ using namespace org::apache::impala::fb;
 
 DECLARE_int32(krpc_port);
 
+/// Make the BlockNamingPolicy enum easy to print. Must be declared in 
impala::test
+namespace impala {
+namespace test {
+std::ostream& operator<<(std::ostream& os, const BlockNamingPolicy& 
naming_policy)
+{
+  switch(naming_policy) {
+  case BlockNamingPolicy::UNPARTITIONED:
+    os << "UNPARTITIONED"; break;
+  case BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME:
+    os << "PARTITIONED_SINGLE_FILENAME"; break;
+  case BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES:
+    os << "PARTITIONED_UNIQUE_FILENAMES"; break;
+  default: os.setstate(std::ios_base::failbit);
+  }
+  return os;
+}
+}
+}
+
 /// Sample 'n' elements without replacement from the set [0..N-1].
 /// This is an implementation of "Algorithm R" by J. Vitter.
 void SampleN(int n, int N, vector<int>* out) {
@@ -115,6 +135,15 @@ void Cluster::GetBackendAddress(int host_idx, 
TNetworkAddress* addr) const {
   addr->port = hosts_[host_idx].be_port;
 }
 
+Cluster Cluster::CreateRemoteCluster(int num_impala_nodes, int num_data_nodes) 
{
+  Cluster cluster;
+  // Set of Impala hosts
+  cluster.AddHosts(num_impala_nodes, true, false);
+  // Set of datanodes
+  cluster.AddHosts(num_data_nodes, false, true);
+  return cluster;
+}
+
 const vector<int>& Cluster::datanode_with_backend_host_idxs() const {
   return datanode_with_backend_host_idxs_;
 }
@@ -123,6 +152,16 @@ const vector<int>& Cluster::datanode_only_host_idxs() 
const {
   return datanode_only_host_idxs_;
 }
 
+void Schema::AddEmptyTable(
+    const TableName& table_name, BlockNamingPolicy naming_policy) {
+  DCHECK(tables_.find(table_name) == tables_.end());
+  // Create table
+  Table table;
+  table.naming_policy = naming_policy;
+  // Insert table
+  tables_.emplace(table_name, table);
+}
+
 void Schema::AddSingleBlockTable(
     const TableName& table_name, const vector<int>& 
non_cached_replica_host_idxs) {
   AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
@@ -154,14 +193,17 @@ void Schema::AddSingleBlockTable(const TableName& 
table_name,
 
 void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
     ReplicaPlacement replica_placement, int num_replicas) {
-  AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 
0);
+  AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 
0,
+      BlockNamingPolicy::UNPARTITIONED);
 }
 
 void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
-    ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas) {
+    ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas,
+    BlockNamingPolicy naming_policy) {
   DCHECK_GT(num_replicas, 0);
   DCHECK(num_cached_replicas <= num_replicas);
   Table table;
+  table.naming_policy = naming_policy;
   for (int i = 0; i < num_blocks; ++i) {
     Block block;
     vector<int>& replica_idxs = block.replica_host_idxs;
@@ -238,25 +280,28 @@ void Plan::AddTableScan(const TableName& table_name) {
   for (int i = 0; i < blocks.size(); ++i) {
     const Block& block = blocks[i];
     TScanRangeLocationList scan_range_locations;
-    BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations);
+    BuildTScanRangeLocationList(table_name, block, i, table.naming_policy,
+        &scan_range_locations);
     scan_range_specs_.concrete_ranges.push_back(scan_range_locations);
   }
   const vector<FileSplitGeneratorSpec>& specs = table.specs;
   for (int i = 0; i < specs.size(); ++i) {
     const FileSplitGeneratorSpec& file_spec = specs[i];
     TFileSplitGeneratorSpec spec;
-    BuildScanRangeSpec(table_name, file_spec, i, &spec);
+    BuildScanRangeSpec(table_name, file_spec, i, table.naming_policy, &spec);
     scan_range_specs_.split_specs.push_back(spec);
   }
 }
 
 void Plan::BuildTScanRangeLocationList(const TableName& table_name, const 
Block& block,
-    int block_idx, TScanRangeLocationList* scan_range_locations) {
+    int block_idx, BlockNamingPolicy naming_policy,
+    TScanRangeLocationList* scan_range_locations) {
   const vector<int>& replica_idxs = block.replica_host_idxs;
   const vector<bool>& is_cached = block.replica_host_idx_is_cached;
   DCHECK_EQ(replica_idxs.size(), is_cached.size());
   int num_replicas = replica_idxs.size();
-  BuildScanRange(table_name, block, block_idx, 
&scan_range_locations->scan_range);
+  BuildScanRange(table_name, block, block_idx, naming_policy,
+      &scan_range_locations->scan_range);
   scan_range_locations->locations.resize(num_replicas);
   for (int i = 0; i < num_replicas; ++i) {
     TScanRangeLocation& location = scan_range_locations->locations[i];
@@ -265,16 +310,68 @@ void Plan::BuildTScanRangeLocationList(const TableName& 
table_name, const Block&
   }
 }
 
+void Plan::GetBlockPaths(const TableName& table_name, bool is_spec, int index,
+    BlockNamingPolicy naming_policy, string* relative_path, int64_t* 
partition_id,
+    string* partition_path) {
+  // For debugging, it is useful to differentiate between Blocks and
+  // FileSplitGeneratorSpecs.
+  string spec_or_block = is_spec ? "_spec_" : "_block_";
+
+  switch (naming_policy) {
+  case BlockNamingPolicy::UNPARTITIONED:
+    // For unpartitioned tables, use unique file names and set partition_id = 0
+    // Encoding the table name and index in the file helps debugging.
+    // For example, an unpartitioned table may contain two files with paths 
like:
+    // /warehouse/{table_name}/file1.csv
+    // /warehouse/{table_name}/file2.csv
+    *relative_path = table_name + spec_or_block + std::to_string(index);
+    *partition_id = 0;
+    *partition_path = "/warehouse/" + table_name;
+    break;
+  case BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME:
+    // For partitioned tables with simple names, use a simple file name, but 
vary the
+    // partition id and partition_path by using the index as the partition_id 
and as
+    // part of the partition_path.
+    // For example, a partitioned table with two partitions might have paths 
like:
+    // /warehouse/{table_name}/year=2010/000001_0
+    // /warehouse/{table_name}/year=2009/000001_0
+    *relative_path = "000001_0";
+    *partition_id = index;
+    *partition_path = "/warehouse/" + table_name + "/part=" + 
std::to_string(index);
+    break;
+  case BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES:
+    // For partitioned tables with unique names, the file name, partition_id, 
and
+    // partition_path all incorporate the index.
+    // For example, a partitioned table with two partitions might have paths 
like:
+    // /warehouse/{table_name}/year=2010/6541856e3fb0583d_654267678_data.0.parq
+    // /warehouse/{table_name}/year=2009/6541856e3fb0583d_627636719_data.0.parq
+    *relative_path = table_name + spec_or_block + std::to_string(index);
+    *partition_id = index;
+    *partition_path = "/warehouse/" + table_name + "/part=" + 
std::to_string(index);
+    break;
+  default:
+    DCHECK(false) << "Invalid naming_policy";
+  }
+}
+
 void Plan::BuildScanRange(const TableName& table_name, const Block& block, int 
block_idx,
-    TScanRange* scan_range) {
+    BlockNamingPolicy naming_policy, TScanRange* scan_range) {
   // Initialize locations.scan_range correctly.
   THdfsFileSplit file_split;
-  // 'length' is the only member considered by the scheduler.
+  // 'length' is the only member considered by the scheduler for scheduling 
non-remote
+  // blocks.
   file_split.length = block.length;
-  // Encoding the table name and block index in the file helps debugging.
-  file_split.relative_path = table_name + "_block_" + 
std::to_string(block_idx);
+
+  // Consistent remote scheduling considers the partition_path_hash, 
relative_path,
+  // and offset when scheduling blocks.
+  string partition_path;
+  GetBlockPaths(table_name, false, block_idx, naming_policy, 
&file_split.relative_path,
+      &file_split.partition_id, &partition_path);
+  file_split.partition_path_hash =
+      static_cast<int32_t>(HashUtil::Hash(partition_path.data(),
+          partition_path.length(), 0));
   file_split.offset = 0;
-  file_split.partition_id = 0;
+
   // For now, we model each file by a single block.
   file_split.file_length = block.length;
   file_split.file_compression = THdfsCompression::NONE;
@@ -283,13 +380,19 @@ void Plan::BuildScanRange(const TableName& table_name, 
const Block& block, int b
 }
 
 void Plan::BuildScanRangeSpec(const TableName& table_name,
-    const FileSplitGeneratorSpec& spec, int spec_idx,
+    const FileSplitGeneratorSpec& spec, int spec_idx, BlockNamingPolicy 
naming_policy,
     TFileSplitGeneratorSpec* thrift_spec) {
   THdfsFileDesc thrift_file;
 
+  string relative_path;
+  int64_t partition_id;
+  string partition_path;
+  GetBlockPaths(table_name, true, spec_idx, naming_policy, &relative_path,
+      &partition_id, &partition_path);
+
   flatbuffers::FlatBufferBuilder fb_builder;
   auto rel_path =
-      fb_builder.CreateString(table_name + "_spec_" + 
std::to_string(spec_idx));
+      fb_builder.CreateString(relative_path);
   auto fb_file_desc = CreateFbFileDesc(fb_builder, rel_path, spec.length);
   fb_builder.Finish(fb_file_desc);
 
@@ -297,10 +400,13 @@ void Plan::BuildScanRangeSpec(const TableName& table_name,
       reinterpret_cast<const char*>(fb_builder.GetBufferPointer()), 
fb_builder.GetSize());
   thrift_file.__set_file_desc_data(buffer);
 
-  thrift_spec->__set_partition_id(0);
+  thrift_spec->__set_partition_id(partition_id);
   thrift_spec->__set_file_desc(thrift_file);
   thrift_spec->__set_max_block_size(spec.block_size);
   thrift_spec->__set_is_splittable(spec.is_splittable);
+  int32_t partition_path_hash = 
static_cast<int32_t>(HashUtil::Hash(partition_path.data(),
+      partition_path.length(), 0));
+  thrift_spec->__set_partition_path_hash(partition_path_hash);
 }
 
 int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index 30c0f76..4005927 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -87,6 +87,24 @@ enum class ReplicaPlacement {
   REMOTE_ONLY,
 };
 
+/// Blocks and FileSplitGeneratorSpecs mimic real files, and for some parts of 
scheduling
+/// (e.g. consistent remote scheduling) the actual file names and partition 
paths matter.
+/// When defining a table, you can specify a block naming policy to control 
this.
+///  - UNPARTITIONED means that the partition paths and partition ids are 
constant, but
+///    the file names are unique.
+///  - PARTITIONED_SINGLE_FILENAME means that the partition paths and 
partition ids are
+///    unique, but the file name inside the partition is a single constant.
+///  - PARTITIONED_UNIQUE_FILENAMES means that the partition paths, partition 
ids, and
+///    the file names inside the partition.
+/// These policies are mostly irrelevent for single block tables or tables 
with local
+/// scheduling, so the default policy is UNPARTITIONED.
+enum class BlockNamingPolicy {
+  UNPARTITIONED,
+  PARTITIONED_SINGLE_FILENAME,
+  PARTITIONED_UNIQUE_FILENAMES,
+};
+std::ostream& operator<<(std::ostream& os, const BlockNamingPolicy& 
naming_policy);
+
 /// Host model. Each host can have either a backend, a datanode, or both. To 
specify that
 /// a host should not act as a backend or datanode specify '-1' as the 
respective port.
 /// A host with a backend is always a coordinator but it may not be an 
executor.
@@ -122,6 +140,13 @@ class Cluster {
   const std::vector<Host>& hosts() const { return hosts_; }
   int NumHosts() const { return hosts_.size(); }
 
+  /// Helper function to create a cluster with Impala nodes separate from the 
datanodes.
+  /// This is primarily used for consistent remote scheduling tests. This 
places
+  /// the impalad nodes first, then the data nodes. Impalad nodes will have 
indices
+  /// in the range [0, num_impala_nodes-1] and data nodes will have indices in 
the
+  /// range [num_impala_nodes, num_impala_nodes+num_data_nodes-1].
+  static Cluster CreateRemoteCluster(int num_impala_nodes, int num_data_nodes);
+
   /// These methods return lists of host indexes, grouped by their type, which 
can be used
   /// to draw samples of random sets of hosts.
   /// TODO: Think of a nicer abstraction to expose this information.
@@ -193,10 +218,14 @@ struct FileSplitGeneratorSpec {
   static const int64_t DEFAULT_BLOCK_SIZE;
 };
 
-/// A table models multiple partitions, some of which represent their files 
explicitly
-/// with Blocks or with FileSplitGeneratorSpecs. A table can consist of both
-/// representations.
+/// A table models multiple files. Each file can be represented explicitly 
with a Block
+/// or with a FileSplitGeneratorSpecs. A table can consist of files with both
+/// representations. The table can specify a BlockNamingPolicy, which tailors 
the
+/// file name and path for scan ranges to simulate partitioned vs 
unpartitioned tables.
+/// Consistent remote scheduling depends on the file paths, but the file names 
do not
+/// impact other aspects of scheduling.
 struct Table {
+  BlockNamingPolicy naming_policy = BlockNamingPolicy::UNPARTITIONED;
   std::vector<Block> blocks;
   std::vector<FileSplitGeneratorSpec> specs;
 };
@@ -205,6 +234,10 @@ class Schema {
  public:
   Schema(const Cluster& cluster) : cluster_(cluster) {}
 
+  /// Add a table with no blocks. This is useful for tables with a custom 
naming
+  /// policy that later add FileSplitGeneratorSpecs.
+  void AddEmptyTable(const TableName& table_name, BlockNamingPolicy 
naming_policy);
+
   /// Add a table consisting of a single block to the schema with explicitly 
specified
   /// replica indexes for non-cached replicas and without any cached replicas. 
Replica
   /// indexes must refer to hosts in cluster_.hosts() by index.
@@ -226,9 +259,11 @@ class Schema {
 
   /// Add a table to the schema, selecting replica hosts according to the 
given replica
   /// placement preference. After replica selection has been done, 
'num_cached_replicas'
-  /// of them are marked as cached.
+  /// of them are marked as cached. The table uses the specified 
'naming_policy' for
+  /// its blocks.
   void AddMultiBlockTable(const TableName& table_name, int num_blocks,
-      ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas);
+      ReplicaPlacement replica_placement, int num_replicas, int 
num_cached_replicas,
+      BlockNamingPolicy naming_policy);
 
   /// Adds FileSplitGeneratorSpecs to table named 'table_name'. If the table 
does not
   /// exist, creates a new table. Otherwise, adds the 'specs' to an existing 
table.
@@ -293,15 +328,24 @@ class Plan {
 
   /// Initialize a TScanRangeLocationList object in place.
   void BuildTScanRangeLocationList(const TableName& table_name, const Block& 
block,
-      int block_idx, TScanRangeLocationList* scan_range_locations);
+      int block_idx, BlockNamingPolicy naming_policy,
+      TScanRangeLocationList* scan_range_locations);
+
+  /// Builds appropriate paths for a Block given the table name and block 
naming
+  /// policy.
+  void GetBlockPaths(const TableName& table_name, bool is_spec, int index,
+      BlockNamingPolicy naming_policy, string* relative_path, int64_t* 
partition_id,
+      string* partition_path);
 
   /// Initializes a scan range for a Block.
   void BuildScanRange(
-      const TableName& table_name, const Block& block, int block_idx, 
TScanRange* range);
+      const TableName& table_name, const Block& block, int block_idx,
+      BlockNamingPolicy naming_policy, TScanRange* range);
 
   /// Initializes a scan range for a FileSplitGeneratorSpec.
   void BuildScanRangeSpec(const TableName& table_name, const 
FileSplitGeneratorSpec& spec,
-      int spec_idx, TFileSplitGeneratorSpec* thrift_spec);
+      int spec_idx, BlockNamingPolicy naming_policy,
+      TFileSplitGeneratorSpec* thrift_spec);
 
   /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host 
has not
   /// been added to the plan before, it will add it to 'referenced_datanodes_' 
and return
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index f7d2e4e..e038034 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -31,6 +31,10 @@ class SchedulerTest : public testing::Test {
   SchedulerTest() { srand(0); }
 };
 
+static const vector<BlockNamingPolicy> BLOCK_NAMING_POLICIES(
+    {BlockNamingPolicy::UNPARTITIONED, 
BlockNamingPolicy::PARTITIONED_SINGLE_FILENAME,
+     BlockNamingPolicy::PARTITIONED_UNIQUE_FILENAMES});
+
 /// Smoke test to schedule a single table with a single scan range over a 
single host.
 TEST_F(SchedulerTest, SingleHostSingleFile) {
   Cluster cluster;
@@ -214,16 +218,14 @@ TEST_F(SchedulerTest, NoRemoteExecutorCandidates) {
 /// nodes for varying values of num_remote_executor_candidates. This includes 
cases
 /// where the num_remote_executor_candidates exceeds the number of Impala 
executors.
 TEST_F(SchedulerTest, RemoteExecutorCandidates) {
-  Cluster cluster;
   int num_data_nodes = 3;
   int num_impala_nodes = 5;
-  // Set of datanodes
-  cluster.AddHosts(num_data_nodes, false, true);
-  // Set of Impala hosts
-  cluster.AddHosts(num_impala_nodes, true, false);
+  Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, 
num_data_nodes);
 
   Schema schema(cluster);
-  schema.AddSingleBlockTable("T1", {0, 1, 2});
+  // CreateRemoteCluster places the Impala nodes first, so the data nodes have 
indices
+  // of 5, 6, and 7.
+  schema.AddSingleBlockTable("T1", {5, 6, 7});
 
   // Test a range of number of remote executor candidates, including cases 
where the
   // number of remote executor candidates exceeds the number of Impala nodes.
@@ -254,18 +256,116 @@ TEST_F(SchedulerTest, RemoteExecutorCandidates) {
   }
 }
 
+/// Helper function to verify that two things are treated as distinct for 
consistent
+/// remote placement. The input 'schema' should be created with a Cluster 
initialized
+/// by Cluster::CreateRemoteCluster() with 50 impalads and 3 data nodes. It 
should
+/// have a single table named "T" that contains two schedulable entities 
(blocks, specs,
+/// etc) with size Block::DEFAULT_BLOCK_SIZE that are expected to be distinct. 
It runs
+/// the scheduler 100 times with random replica set to true and verifies that 
the number
+/// of distinct backends is in the right range. If two things are distinct and 
each can
+/// be scheduled on up to 'num_candidates' distinct backends, then the number 
of distinct
+/// backends should be in the range ['num_candidates' + 1, 2 * 
'num_candidates'].
+/// The probability of completely overlapping by chance is extremely low and
+/// SchedulerTests use srand(0) to be deterministic, so this test should only 
fail if
+/// the entities are no longer considered distinct.
+void RemotePlacementVerifyDistinct(const Schema& schema, int num_candidates) {
+  ASSERT_EQ(schema.cluster().NumHosts(), 53);
+  Plan plan(schema);
+  plan.AddTableScan("T");
+  plan.SetRandomReplica(true);
+  plan.SetNumRemoteExecutorCandidates(num_candidates);
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
+
+  // This is not intended to be used with a larger number of candidates
+  ASSERT_LE(num_candidates, 10);
+  int min_distinct_backends = num_candidates + 1;
+  int max_distinct_backends = 2 * num_candidates;
+  ASSERT_EQ(100, result.NumAssignments());
+  EXPECT_EQ(200, result.NumTotalAssignments());
+  EXPECT_EQ(200 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
+  EXPECT_GE(result.NumDistinctBackends(), min_distinct_backends);
+  EXPECT_LE(result.NumDistinctBackends(), max_distinct_backends);
+}
+
+/// Test that consistent remote placement schedules distinct blocks differently
+TEST_F(SchedulerTest, RemotePlacementBlocksDistinct) {
+  int num_data_nodes = 3;
+  int num_impala_nodes = 50;
+  Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, 
num_data_nodes);
+
+  // Two blocks (which translate to actual files for a table) should hash 
differently
+  // and result in different remote placement. This verifies various 
combinations
+  // corresponding to how files are named.
+  for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
+    SCOPED_TRACE(naming_policy);
+    Schema schema(cluster);
+    int num_blocks = 2;
+    int num_remote_candidates = 3;
+    schema.AddMultiBlockTable("T", num_blocks, ReplicaPlacement::RANDOM,
+        num_remote_candidates, 0, naming_policy);
+    RemotePlacementVerifyDistinct(schema, num_remote_candidates);
+  }
+}
+
+/// Test that consistent remote placement schedules distinct file split 
generator specs
+/// differently
+TEST_F(SchedulerTest, RemotePlacementSpecsDistinct) {
+  int num_data_nodes = 3;
+  int num_impala_nodes = 50;
+  Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, 
num_data_nodes);
+
+  // Two specs (which translate to actual files for a table) should hash 
differently
+  // and result in different remote placement. This verifies that is true for 
all
+  // the naming policies.
+  for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
+    SCOPED_TRACE(naming_policy);
+    Schema schema(cluster);
+    int num_remote_candidates = 3;
+    // Add the table with the appropriate naming policy, but without adding 
any blocks.
+    schema.AddEmptyTable("T", naming_policy);
+    // Add two splits with one block each (i.e. the total size of the split is 
the
+    // block size).
+    schema.AddFileSplitGeneratorSpecs("T",
+        {{Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false},
+         {Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, false}});
+    RemotePlacementVerifyDistinct(schema, num_remote_candidates);
+  }
+}
+
+/// Tests that consistent remote placement schedules blocks with distinct 
offsets
+/// differently
+TEST_F(SchedulerTest, RemotePlacementOffsetsDistinct) {
+  int num_data_nodes = 3;
+  int num_impala_nodes = 50;
+  Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, 
num_data_nodes);
+
+  // A FileSplitGeneratorSpec that generates two scan ranges should hash the 
two scan
+  // ranges differently due to different offsets. This is true regardless of 
the
+  // naming_policy (which should not impact the outcome).
+  for (BlockNamingPolicy naming_policy : BLOCK_NAMING_POLICIES) {
+    SCOPED_TRACE(naming_policy);
+    Schema schema(cluster);
+    int num_remote_candidates = 3;
+    // Add the table with the appropriate naming policy, but without adding 
any blocks.
+    schema.AddEmptyTable("T", naming_policy);
+    // Add a splittable spec that will be split into two scan ranges each of
+    // the default block size.
+    schema.AddFileSplitGeneratorSpecs("T",
+        {{2 * Block::DEFAULT_BLOCK_SIZE, Block::DEFAULT_BLOCK_SIZE, true}});
+    RemotePlacementVerifyDistinct(schema, num_remote_candidates);
+  }
+}
+
 /// Verify basic consistency of remote executor candidates. Specifically, it 
schedules
-/// a set of blocks, then removes an executor that did not have any blocks 
assigned to
-/// it, and verifies that rerunning the scheduling results in the same 
assignments.
+/// a set of blocks, then removes some executors that did not have any blocks 
assigned to
+/// them, and verifies that rerunning the scheduling results in the same 
assignments.
 TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) {
-  Cluster cluster;
   int num_data_nodes = 3;
   int num_impala_nodes = 50;
-
-  // Set of Impala hosts
-  cluster.AddHosts(num_impala_nodes, true, false);
-  // Set of datanodes
-  cluster.AddHosts(num_data_nodes, false, true);
+  Cluster cluster = Cluster::CreateRemoteCluster(num_impala_nodes, 
num_data_nodes);
 
   // Replica placement is unimportant for this test. All blocks will be on
   // all datanodes, but Impala is runnning remotely.
@@ -275,28 +375,40 @@ TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) 
{
   Plan plan(schema);
   plan.AddTableScan("T1");
   plan.SetRandomReplica(false);
-  plan.SetNumRemoteExecutorCandidates(3);
+  // TODO: Consistent scheduling is only completely consistent to removing an 
unused
+  // node when the number of executor candidates is 1. See IMPALA-8677.
+  plan.SetNumRemoteExecutorCandidates(1);
 
   Result result_base(plan);
   SchedulerWrapper scheduler(plan);
   ASSERT_OK(scheduler.Compute(&result_base));
   EXPECT_EQ(25, result_base.NumTotalAssignments());
   EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, 
result_base.NumTotalAssignedBytes());
+  EXPECT_GT(result_base.NumDistinctBackends(), 3);
 
-  // There are 25 blocks and 50 Impala hosts. There will be Impala hosts 
without
-  // any assigned bytes. Removing one of them should not change the outcome.
-  Result result_empty_removed(plan);
-  // Find an Impala host that was not assigned any bytes and remove it
-  bool removed_one = false;
+  // There are 25 blocks and 50 Impala hosts. There will be at least 25 Impala 
hosts
+  // without any assigned bytes. Removing some of them should not change the 
outcome.
+  // Generate a list of the hosts without bytes assigned.
+  vector<int> zerobyte_indices;
   for (int i = 0; i < num_impala_nodes; ++i) {
     if (result_base.NumTotalAssignedBytes(i) == 0) {
-      scheduler.RemoveBackend(cluster.hosts()[i]);
-      removed_one = true;
-      break;
+      zerobyte_indices.push_back(i);
     }
   }
-  ASSERT_TRUE(removed_one);
-  // Rerun the scheduling with the node removed.
+  EXPECT_GE(zerobyte_indices.size(), 25);
+
+  // Remove 5 nodes with zero bytes by picking several indices in the list of
+  // nodes with zero bytes and removing the corresponding backends.
+  vector<int> zerobyte_indices_to_remove({3, 7, 12, 15, 19});
+  int num_removed = 0;
+  for (int index_to_remove : zerobyte_indices_to_remove) {
+    int node_index = zerobyte_indices[index_to_remove];
+    scheduler.RemoveBackend(cluster.hosts()[node_index]);
+    num_removed++;
+  }
+  ASSERT_EQ(num_removed, 5);
+  // Rerun the scheduling with the nodes removed.
+  Result result_empty_removed(plan);
   ASSERT_OK(scheduler.Compute(&result_empty_removed));
   EXPECT_EQ(25, result_empty_removed.NumTotalAssignments());
   EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, 
result_empty_removed.NumTotalAssignedBytes());
@@ -304,7 +416,8 @@ TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) {
   // Verify that the outcome is identical.
   for (int i = 0; i < num_impala_nodes; ++i) {
     EXPECT_EQ(result_base.NumRemoteAssignedBytes(i),
-              result_empty_removed.NumRemoteAssignedBytes(i));
+        result_empty_removed.NumRemoteAssignedBytes(i))
+        << "Mismatch at index " << std::to_string(i);
   }
 }
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index d4183e3..bb64f0c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -109,6 +109,7 @@ Status Scheduler::GenerateScanRanges(const 
vector<TFileSplitGeneratorSpec>& spec
       hdfs_scan_range.__set_offset(scan_range_offset);
       hdfs_scan_range.__set_partition_id(spec.partition_id);
       hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec());
+      hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash);
       TScanRange scan_range;
       scan_range.__set_hdfs_file_split(hdfs_scan_range);
       TScanRangeLocationList scan_range_list;
@@ -794,10 +795,12 @@ void 
Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
   // than 'num_candidates'.
   set<IpAddr> distinct_backends;
   // Generate multiple hashes of the file split by using the hash as a seed to 
a PRNG.
-  // Note: This hashes both the filename and the offset to allow very large 
files
-  // to be spread across more executors.
-  uint32_t hash = HashUtil::Hash(hdfs_file_split->relative_path.data(),
-      hdfs_file_split->relative_path.length(), 0);
+  // Note: The hash includes the partition path hash, the filename (relative 
to the
+  // partition directory), and the offset. The offset is used to allow very 
large files
+  // that have multiple splits to be spread across more executors.
+  uint32_t hash = static_cast<uint32_t>(hdfs_file_split->partition_path_hash);
+  hash = HashUtil::Hash(hdfs_file_split->relative_path.data(),
+      hdfs_file_split->relative_path.length(), hash);
   hash = HashUtil::Hash(&hdfs_file_split->offset, 
sizeof(hdfs_file_split->offset), hash);
   pcg32 prng(hash);
   // To avoid any problem scenarios, limit the total number of iterations
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index f188c72..eb780e2 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -191,6 +191,12 @@ struct THdfsFileSplit {
 
   // whether this file is erasure-coded
   8: required bool is_erasure_coded
+
+  // Hash of the partition's path. This must be hashed with a hash algorithm 
that is
+  // consistent across different processes and machines. This is currently 
using
+  // Java's String.hashCode(), which is consistent. For testing purposes, this 
can use
+  // any consistent hash.
+  9: required i32 partition_path_hash
 }
 
 // key range for single THBaseScanNode
@@ -223,6 +229,9 @@ struct TFileSplitGeneratorSpec {
 
   // ID of partition within the THdfsTable associated with this scan node.
   4: required i64 partition_id
+
+  // Hash of the partition path
+  5: required i32 partition_path_hash
 }
 
 // Specification of an individual data range which is held in its entirety
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 4f084f2..a9cf603 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -985,7 +985,8 @@ public class HdfsScanNode extends ScanNode {
     boolean splittable = partition.getFileFormat().isSplittable(
         HdfsCompression.fromFileName(fileDesc.getRelativePath()));
     TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
-        fileDesc.toThrift(), maxBlockSize, splittable, partition.getId());
+        fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(),
+        partition.getLocation().hashCode());
     scanRangeSpecs_.addToSplit_specs(splitSpec);
     long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength());
     if (splittable) {
@@ -1051,7 +1052,7 @@ public class HdfsScanNode extends ScanNode {
         scanRange.setHdfs_file_split(new 
THdfsFileSplit(fileDesc.getRelativePath(),
             currentOffset, currentLength, partition.getId(), 
fileDesc.getFileLength(),
             fileDesc.getFileCompression().toThrift(), 
fileDesc.getModificationTime(),
-            fileDesc.getIsEc()));
+            fileDesc.getIsEc(), partition.getLocation().hashCode()));
         TScanRangeLocationList scanRangeLocations = new 
TScanRangeLocationList();
         scanRangeLocations.scan_range = scanRange;
         scanRangeLocations.locations = locations;
diff --git a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java 
b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
index 75c2730..a19d8cc 100644
--- a/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/ExplainTest.java
@@ -161,6 +161,7 @@ public class ExplainTest extends FrontendTestBase {
 
     when(mockHdfsPartition.getLocationPath())
         .thenReturn(new org.apache.hadoop.fs.Path(path));
+    when(mockHdfsPartition.getLocation()).thenReturn(path);
     when(mockHdfsPartition.getFileDescriptors()).thenReturn(mockFilesDescs);
     when(mockHdfsPartition.getFileFormat()).thenReturn(HdfsFileFormat.PARQUET);
     when(mockHdfsPartition.getFsType()).thenReturn(fsType);

Reply via email to