This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4477398ae IMPALA-12818: Intermediate Result Caching plan node framework
4477398ae is described below

commit 4477398ae46415d3fb32db2a8fd5e6d2060cbd3f
Author: Kurt Deschler <kdesc...@cloudera.com>
AuthorDate: Tue Mar 7 13:13:39 2023 -0500

    IMPALA-12818: Intermediate Result Caching plan node framework
    
    This patch adds a plan node framework for caching of intermediate result
    tuples within a query. Actual caching of data will be implemented in
    subsequent patches.
    
    A new plan node type TupleCacheNode is introduced for brokering caching
    decisions at runtime. If the result is in the cache, the TupleCacheNode will
    return results from the cache and skip executing its child node. If the
    result is not cached, the TupleCacheNode will execute its child node and
    mirror the resulting RowBatches to the cache.
    
    The TupleCachePlanner decides where to place the TupleCacheNodes. To
    calculate eligibility and cache keys, the plan must be in a stable state
    that will not change shape. TupleCachePlanner currently runs at the end
    of planning after the DistributedPlanner and ParallelPlanner have run.
    As a first cut, TupleCachePlanner places TupleCacheNodes at every
    eligible location. Eligibility is currently restricted to immediately
    above HdfsScanNodes. This implementation will need to incorporate cost
    heuristics and other policies for placement.
    
    Each TupleCacheNode has a hash key that is generated from the logical
    plan below for the purpose of identifying results that have been cached
    by semantically equivalent query subtrees. The initial implementation of
    the subtree hash uses the plan Thrift to uniquely identify the subtree.
    
    Tuple caching is enabled by setting the enable_tuple_cache query option
    to true. As a safeguard during development, enable_tuple_cache can only
    be set to true if the "allow_tuple_caching" startup option is set to
    true. It defaults to false to minimize the impact for production clusters.
    bin/start-impala-cluster.py sets allow_tuple_caching=true by default
    to enable it in the development environment.
    
    Testing:
     - This adds a frontend test that does basic checks for cache keys and
       eligibility
     - This verifies the presence of the caching information in the explain
       plan output.
    
    Change-Id: Ia1f36a87dcce6efd5d1e1f0bc04009bf009b1961
    Reviewed-on: http://gerrit.cloudera.org:8080/21035
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Michael Smith <michael.sm...@cloudera.com>
    Reviewed-by: Yida Wu <wydbaggio...@gmail.com>
    Reviewed-by: Kurt Deschler <kdesc...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/exec-node.cc                           |   4 +
 be/src/exec/tuple-cache-node.cc                    |  93 +++++++
 be/src/exec/tuple-cache-node.h                     |  53 ++++
 be/src/service/query-options.cc                    |  10 +
 be/src/service/query-options.h                     |   3 +-
 bin/start-impala-cluster.py                        |   9 +
 common/thrift/ImpalaService.thrift                 |   5 +
 common/thrift/PlanNodes.thrift                     |   9 +
 common/thrift/Query.thrift                         |   3 +
 .../impala/common/ThriftSerializationCtx.java      |  55 +++++
 .../org/apache/impala/planner/HdfsScanNode.java    |   3 +
 .../java/org/apache/impala/planner/PlanNode.java   |  87 ++++++-
 .../java/org/apache/impala/planner/Planner.java    |  29 ++-
 .../org/apache/impala/planner/TupleCacheInfo.java  | 157 ++++++++++++
 .../org/apache/impala/planner/TupleCacheNode.java  | 125 ++++++++++
 .../apache/impala/planner/TupleCachePlanner.java   | 103 ++++++++
 .../apache/impala/planner/TupleCacheInfoTest.java  |  98 ++++++++
 .../org/apache/impala/planner/TupleCacheTest.java  | 270 +++++++++++++++++++++
 .../queries/QueryTest/explain-level1.test          |  17 +-
 .../queries/QueryTest/explain-level2.test          |   8 +
 21 files changed, 1125 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index a9c5c61de..c98b8f70a 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -104,6 +104,7 @@ add_library(Exec
   table-sink-base.cc
   text-converter.cc
   topn-node.cc
+  tuple-cache-node.cc
   union-node.cc
   unnest-node.cc
 )
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 745d9d51c..8130378ac 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -51,6 +51,7 @@
 #include "exec/streaming-aggregation-node.h"
 #include "exec/subplan-node.h"
 #include "exec/topn-node.h"
+#include "exec/tuple-cache-node.h"
 #include "exec/union-node.h"
 #include "exec/unnest-node.h"
 #include "exprs/expr.h"
@@ -230,6 +231,9 @@ Status PlanNode::CreatePlanNode(
     case TPlanNodeType::ICEBERG_METADATA_SCAN_NODE:
       *node = pool->Add(new IcebergMetadataScanPlanNode());
       break;
+    case TPlanNodeType::TUPLE_CACHE_NODE:
+      *node = pool->Add(new TupleCachePlanNode());
+      break;
     default:
       map<int, const char*>::const_iterator i =
           _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
new file mode 100644
index 000000000..a63f64765
--- /dev/null
+++ b/be/src/exec/tuple-cache-node.cc
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+
+#include "exec/tuple-cache-node.h"
+#include "exec/exec-node-util.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
+
+// Global feature flag for tuple caching. If false, enable_tuple_cache cannot 
be true
+// and the coordinator cannot produce plans with TupleCacheNodes.
+DEFINE_bool(allow_tuple_caching, false, "If false, tuple caching cannot be 
used.");
+
+namespace impala {
+
+Status TupleCachePlanNode::CreateExecNode(
+    RuntimeState* state, ExecNode** node) const {
+  ObjectPool* pool = state->obj_pool();
+  *node = pool->Add(new TupleCacheNode(pool, *this, state->desc_tbl()));
+  return Status::OK();
+}
+
+TupleCacheNode::TupleCacheNode(
+    ObjectPool* pool, const TupleCachePlanNode& pnode, const DescriptorTbl& 
descs)
+    : ExecNode(pool, pnode, descs)
+    , subtree_hash_(pnode.tnode_->tuple_cache_node.subtree_hash) {
+}
+
+Status TupleCacheNode::Open(RuntimeState* state) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedOpenEventAdder ea(this);
+  RETURN_IF_ERROR(ExecNode::Open(state));
+
+  // The frontend cannot create a TupleCacheNode if enable_tuple_cache=false
+  // Fail the query if we see this.
+  if (!state->query_options().enable_tuple_cache) {
+    return Status("Invalid tuple caching configuration: 
enable_tuple_cache=false");
+  }
+
+  RETURN_IF_ERROR(child(0)->Open(state));
+
+  return Status::OK();
+}
+
+Status TupleCacheNode::GetNext(
+    RuntimeState* state, RowBatch* output_row_batch, bool* eos) {
+  SCOPED_TIMER(runtime_profile_->total_time_counter());
+  ScopedGetNextEventAdder ea(this, eos);
+  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
+  RETURN_IF_CANCELLED(state);
+  RETURN_IF_ERROR(QueryMaintenance(state));
+
+  // Save the number of rows in case GetNext() is called with a non-empty 
batch,
+  // which can happen in a subplan.
+  int num_rows_before = output_row_batch->num_rows();
+
+  RETURN_IF_ERROR(child(0)->GetNext(state, output_row_batch, eos));
+
+  // Note: TupleCacheNode does not alter its child's output (or the equivalent
+  // output from the cache), so it does not enforce its own limit on the 
output.
+  // Any limit should be enforced elsewhere, and this code omits the logic
+  // to enforce a limit.
+  int num_rows_added = output_row_batch->num_rows() - num_rows_before;
+  DCHECK_GE(num_rows_added, 0);
+  IncrementNumRowsReturned(num_rows_added);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
+  return Status::OK();
+}
+
+void TupleCacheNode::DebugString(int indentation_level, std::stringstream* 
out) const {
+  *out << string(indentation_level * 2, ' ');
+  *out << "TupleCacheNode(" << subtree_hash_;
+  ExecNode::DebugString(indentation_level, out);
+  *out << ")";
+}
+
+}
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
new file mode 100644
index 000000000..43401bb97
--- /dev/null
+++ b/be/src/exec/tuple-cache-node.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "exec/exec-node.h"
+
+namespace impala {
+
+class TupleCachePlanNode : public PlanNode {
+ public:
+  Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
+  ~TupleCachePlanNode(){}
+};
+
+/// Node that caches rows produced by a child node.
+///
+/// This is currently a stub implementation that simply returns the RowBatch 
produced
+/// by its child node.
+///
+/// FUTURE:
+/// This node looks up the subtree_hash_ in the tuple cache. If an entry 
exists, this
+/// reads rows from the cache rather than executing its child node. If the 
entry does not
+/// exist, this will read rows from its child, write them to the cache, and 
returns them.
+class TupleCacheNode : public ExecNode {
+ public:
+  TupleCacheNode(ObjectPool* pool, const TupleCachePlanNode& pnode,
+      const DescriptorTbl& descs);
+
+  Status Open(RuntimeState* state) override;
+  Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  void DebugString(int indentation_level, std::stringstream* out) const 
override;
+private:
+  const std::string subtree_hash_;
+};
+
+}
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c31c8818c..7f834b38f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -46,6 +46,7 @@ using namespace impala;
 using namespace strings;
 
 DECLARE_int32(idle_session_timeout);
+DECLARE_bool(allow_tuple_caching);
 
 void impala::OverlayQueryOptions(
     const TQueryOptions& src, const QueryOptionsMask& mask, TQueryOptions* 
dst) {
@@ -1214,6 +1215,15 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_query_cpu_count_divisor(double_val);
         break;
       }
+      case TImpalaQueryOptions::ENABLE_TUPLE_CACHE: {
+        bool enable_tuple_cache = IsTrue(value);
+        if (enable_tuple_cache && !FLAGS_allow_tuple_caching) {
+          return Status(
+              "Tuple caching is disabled, so enable_tuple_cache cannot be set 
to true.");
+        }
+        query_options->__set_enable_tuple_cache(enable_tuple_cache);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index defa0dd7f..254a7ccea 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::QUERY_CPU_COUNT_DIVISOR + 1);                       
          \
+      TImpalaQueryOptions::ENABLE_TUPLE_CACHE + 1);                            
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -323,6 +323,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::DEVELOPMENT)                                          
          \
   QUERY_OPT_FN(query_cpu_count_divisor,                                        
          \
       QUERY_CPU_COUNT_DIVISOR, TQueryOptionLevel::ADVANCED)                    
          \
+  QUERY_OPT_FN(enable_tuple_cache, ENABLE_TUPLE_CACHE, 
TQueryOptionLevel::ADVANCED)      \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 7d2db963f..a05bc04f0 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -173,6 +173,10 @@ parser.add_option("--reduce_disk_io_threads", 
default="True", type="choice",
                   choices=["true", "True", "false", "False"],
                   help="If true, reduce the number of disk io mgr threads for "
                   "filesystems that are not the TARGET_FILESYSTEM.")
+parser.add_option("--disable_tuple_caching", default=False, 
action="store_true",
+                  help="If true, sets the tuple caching feature flag "
+                  "(allow_tuple_caching) to false. This defaults to false to 
enable "
+                  "tuple caching in the development environment")
 
 # For testing: list of comma-separated delays, in milliseconds, that delay 
impalad catalog
 # replica initialization. The ith delay is applied to the ith impalad.
@@ -618,6 +622,11 @@ def build_impalad_arg_lists(cluster_size, 
num_coordinators, use_exclusive_coordi
       args = "-jni_frontend_class={jni_frontend_class} {args}".format(
           jni_frontend_class=options.jni_frontend_class, args=args)
 
+    if options.disable_tuple_caching:
+      args = "-allow_tuple_caching=false {args}".format(args=args)
+    else:
+      args = "-allow_tuple_caching=true {args}".format(args=args)
+
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index d308463dc..6d74339ef 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -916,6 +916,11 @@ enum TImpalaQueryOptions {
   // If this query option is not set, value of backend flag 
--query_cpu_count_divisor
   // (default to 1.0) will be picked up instead.
   QUERY_CPU_COUNT_DIVISOR = 173
+
+  // Enables intermediate result caching. The frontend will determine 
eligibility and
+  // potentially insert tuple cache nodes into the plan. This can only be set 
if the
+  // allow_tuple_caching feature startup flag is set to true.
+  ENABLE_TUPLE_CACHE = 174
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 1c7333571..329fba709 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -53,6 +53,7 @@ enum TPlanNodeType {
   MULTI_AGGREGATION_NODE = 17
   ICEBERG_DELETE_NODE = 18
   ICEBERG_METADATA_SCAN_NODE = 19
+  TUPLE_CACHE_NODE = 20
 }
 
 // phases of an execution node
@@ -702,6 +703,12 @@ struct TIcebergMetadataScanNode {
   3: required string metadata_table_name;
 }
 
+struct TTupleCacheNode {
+  // Cache key that includes a hashed representation of the entire subtree 
below
+  // this point in the plan.
+  1: required string subtree_hash
+}
+
 // See PipelineMembership in the frontend for details.
 struct TPipelineMembership {
   1: required Types.TPlanNodeId pipe_id
@@ -761,6 +768,8 @@ struct TPlanNode {
   26: required ResourceProfile.TBackendResourceProfile resource_profile
 
   27: optional TCardinalityCheckNode cardinality_check_node
+
+  28: optional TTupleCacheNode tuple_cache_node
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index ae1787fa4..60122ad40 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -696,6 +696,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   174: optional double query_cpu_count_divisor
+
+  // See comment in ImpalaService.thrift
+  175: optional bool enable_tuple_cache = false;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git 
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java 
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
new file mode 100644
index 000000000..a0d245cb9
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import org.apache.impala.planner.TupleCacheInfo;
+
+/**
+ * The Thrift serialization functions need to adjust output based on whether 
the
+ * Thrift serialization is happening for tuple caching or not. This context is
+ * passed to Thrift serialization functions like initThrift() or toThrift().
+ * The context currently provides a way to determine whether serialization is 
happening
+ * for tuple caching. It will be expanded to provide other methods that the 
Thrift
+ * serialization functions can call to translate the Thrift output or register
+ * non-deterministic operations.
+ */
+public class ThriftSerializationCtx {
+  private TupleCacheInfo tupleCacheInfo_;
+
+  /**
+   * Constructor for tuple caching serialization
+   */
+  public ThriftSerializationCtx(TupleCacheInfo tupleCacheInfo) {
+    tupleCacheInfo_ = tupleCacheInfo;
+  }
+
+  /**
+   * Constructor for normal serialization
+   */
+  public ThriftSerializationCtx() {
+    tupleCacheInfo_ = null;
+  }
+
+  /**
+   * Returns whether serialization is on behalf of the tuple cache or not. 
This is
+   * intended to be used to mask out unnecessary fields.
+   */
+  public boolean isTupleCache() {
+    return tupleCacheInfo_ != null;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 82aea8319..697ea76d5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -2606,4 +2606,7 @@ public class HdfsScanNode extends ScanNode {
   }
 
   protected boolean isAllColumnarScanner() { return allColumnarFormat_; }
+
+  @Override
+  public boolean isTupleCachingImplemented() { return true; }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index e14195f12..f7a5467ca 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -39,6 +39,7 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.common.ThriftSerializationCtx;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import org.apache.impala.thrift.TExecNodePhase;
@@ -170,6 +171,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // is involved.
   protected boolean hasHardEstimates_ = false;
 
+  protected TupleCacheInfo tupleCacheInfo_;
+
   protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String 
displayName) {
     this(id, displayName);
     tupleIds_.addAll(tupleIds);
@@ -490,10 +493,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
     return result;
   }
 
-  // Append a flattened version of this plan node, including all children in 
the same
-  // fragment, to 'container'.
-  private void treeToThriftHelper(TPlan container) {
-    TPlanNode msg = new TPlanNode();
+  /**
+   * Common initialization of TPlanNode used for all different types of 
PlanNodes.
+   * This is called before calling toThrift().
+   */
+  private void initThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
     msg.node_id = id_.asInt();
     msg.limit = limit_;
 
@@ -520,12 +524,29 @@ abstract public class PlanNode extends TreeNode<PlanNode> 
{
       msg.addToRuntime_filters(filter.toThrift());
     }
     msg.setDisable_codegen(disableCodegen_);
-    Preconditions.checkState(nodeResourceProfile_.isValid());
-    msg.resource_profile = nodeResourceProfile_.toThrift();
     msg.pipelines = new ArrayList<>();
-    for (PipelineMembership pipe : pipelines_) {
-      msg.pipelines.add(pipe.toThrift());
+    if (serialCtx.isTupleCache()) {
+      // At the point when TupleCachePlanner runs, the pipelines and resource 
profile
+      // have not been calculated yet. These should not be in the cache key 
anyway, so
+      // mask them out.
+      //
+      // resource_profile is a required field, so use a generic struct.
+      msg.resource_profile = ResourceProfile.noReservation(0).toThrift();
+    } else {
+      Preconditions.checkState(nodeResourceProfile_.isValid());
+      msg.resource_profile = nodeResourceProfile_.toThrift();
+      for (PipelineMembership pipe : pipelines_) {
+        msg.pipelines.add(pipe.toThrift());
+      }
     }
+  }
+
+  // Append a flattened version of this plan node, including all children in 
the same
+  // fragment, to 'container'.
+  private void treeToThriftHelper(TPlan container) {
+    TPlanNode msg = new TPlanNode();
+    ThriftSerializationCtx serialCtx = new ThriftSerializationCtx();
+    initThrift(msg, serialCtx);
     toThrift(msg);
     container.addToNodes(msg);
     // For the purpose of the BE consider cross-fragment children (i.e.
@@ -1199,4 +1220,54 @@ abstract public class PlanNode extends 
TreeNode<PlanNode> {
       child.reduceCardinalityByRuntimeFilter(nodeStack, reductionScale);
     }
   }
+
+  public TupleCacheInfo getTupleCacheInfo() {
+    return tupleCacheInfo_;
+  }
+
+  /**
+   * Compute the tuple cache eligibility and keys via a bottom-up tree 
traversal.
+   *
+   * In order to be eligible, a node's children need to all be eligible and 
the node
+   * must explicitly support tuple caching. TODO: The computation of 
eligibility will
+   * incorporate more information about whether the computation is 
non-deterministic.
+   *
+   * This computes the cache key by hashing Thrift structures, but it only 
computes
+   * the key if the node is eligible to avoid overhead.
+   */
+  public void computeTupleCacheInfo() {
+    tupleCacheInfo_ = new TupleCacheInfo();
+    // computing the tuple cache information is a bottom-up tree traversal,
+    // so visit and merge the children before processing this node's contents
+    for (int i = 0; i < getChildCount(); i++) {
+      getChild(i).computeTupleCacheInfo();
+      tupleCacheInfo_.mergeChild(getChild(i).getTupleCacheInfo());
+    }
+
+    if (!isTupleCachingImplemented()) {
+      
tupleCacheInfo_.setIneligible(TupleCacheInfo.IneligibilityReason.NOT_IMPLEMENTED);
+    }
+
+    // If we are already ineligible, there is no need to process the Thrift
+    if (!tupleCacheInfo_.isEligible()) {
+      tupleCacheInfo_.finalize();
+      return;
+    }
+
+    // Incorporate this node's information
+    // TODO: This will also calculate eligibility via initThrift/toThrift.
+    // TODO: This will adjust the output of initThrift/toThrift to mask out 
items.
+    TPlanNode msg = new TPlanNode();
+    ThriftSerializationCtx serialCtx = new 
ThriftSerializationCtx(tupleCacheInfo_);
+    initThrift(msg, serialCtx);
+    toThrift(msg);
+    tupleCacheInfo_.hashThrift(msg);
+    tupleCacheInfo_.finalize();
+  }
+
+  /**
+   * Whether this node supports tuple caching. All nodes start ineligible and 
then
+   * need to explicitly enable it.
+   */
+  public boolean isTupleCachingImplemented() { return false; }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 61031b036..f3c0b5bfc 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -291,13 +291,23 @@ public class Planner {
   public List<PlanFragment> createPlans() throws ImpalaException {
     List<PlanFragment> distrPlan = createPlanFragments();
     Preconditions.checkNotNull(distrPlan);
-    if (!useParallelPlan(ctx_)) {
-      return Collections.singletonList(distrPlan.get(0));
-    }
-    ParallelPlanner planner = new ParallelPlanner(ctx_);
-    List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
-    ctx_.getTimeline().markEvent("Parallel plans created");
-    return parallelPlans;
+    if (useParallelPlan(ctx_)) {
+      ParallelPlanner parallelPlanner = new ParallelPlanner(ctx_);
+      distrPlan = parallelPlanner.createPlans(distrPlan.get(0));
+      ctx_.getTimeline().markEvent("Parallel plans created");
+    } else {
+      distrPlan = Collections.singletonList(distrPlan.get(0));
+    }
+    // TupleCachePlanner comes last, because it needs to compute the 
eligibility of
+    // various locations in the PlanNode tree. Runtime filters and other 
modifications
+    // to the tree can change this, so this comes after all those 
modifications are
+    // complete.
+    if (useTupleCache(ctx_)) {
+      TupleCachePlanner cachePlanner = new TupleCachePlanner(ctx_);
+      distrPlan = cachePlanner.createPlans(distrPlan);
+      ctx_.getTimeline().markEvent("Tuple caching plan created");
+    }
+    return distrPlan;
   }
 
   /**
@@ -314,6 +324,11 @@ public class Planner {
     return queryOptions.getMt_dop() > 0 || 
queryOptions.isCompute_processing_cost();
   }
 
+  // Return true if ENABLE_TUPLE_CACHE=true
+  public static boolean useTupleCache(PlannerContext planCtx) {
+    return planCtx.getQueryOptions().isEnable_tuple_cache();
+  }
+
   /**
    * Return combined explain string for all plan fragments.
    * Includes the estimated resource requirements from the request if set.
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
new file mode 100644
index 000000000..0d3835870
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.EnumSet;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+import com.google.common.base.Preconditions;
+
+/**
+ * TupleCacheInfo stores the eligibility and cache key information for a 
PlanNode.
+ * It is calculated by a bottom-up traversal of the plan node tree.
+ *
+ * In order to be eligible, a node's children must all be eligible and the 
node must
+ * implement tuple caching support. Eligibility will expand to require 
deterministic
+ * execution.
+ *
+ * The cache key for a node is a combination of the cache keys of children and
+ * the hash of a node's Thrift structure (and any other linked Thrift 
structures).
+ * To support this, it provides a function to incorporate any Thrift 
structure's
+ * contents into the hash.
+ *
+ * For debuggability, this keeps a human-readable trace of what has been 
incorporated
+ * into the cache key. This will help track down why two cache keys are 
different.
+ *
+ * This accumulates information from various sources, then it is finalized and 
cannot
+ * be modified further. The hash key and hash trace cannot be accessed until 
finalize()
+ * is called.
+ */
+public class TupleCacheInfo {
+  // Keep track of the reasons why a location in the plan is ineligible. This 
may be
+  // multiple things, and it is useful to keep the various causes separate.
+  public enum IneligibilityReason {
+    NOT_IMPLEMENTED,
+    CHILDREN_INELIGIBLE,
+  }
+  private EnumSet<IneligibilityReason> ineligibilityReasons_;
+
+  // These fields accumulate partial results until finalize() is called.
+  private Hasher hasher_ = Hashing.murmur3_128().newHasher();
+
+  // The hash trace keeps a human-readable record of the items hashed into the 
cache key.
+  private StringBuilder hashTraceBuilder_ = new StringBuilder();
+
+  // When finalize() is called, these final values are filled in and the 
hasher and
+  // hash trace builder are destroyed.
+  private boolean finalized_ = false;
+  private String finalizedHashTrace_ = null;
+  private String finalizedHashString_ = null;
+
+  public TupleCacheInfo() {
+    ineligibilityReasons_ = EnumSet.noneOf(IneligibilityReason.class);
+  }
+
+  public void setIneligible(IneligibilityReason reason) {
+    Preconditions.checkState(!finalized_,
+        "TupleCacheInfo is finalized and can't be modified");
+    ineligibilityReasons_.add(reason);
+  }
+
+  public boolean isEligible() {
+    return ineligibilityReasons_.isEmpty();
+  }
+
+  public String getHashString() {
+    Preconditions.checkState(isEligible(),
+        "TupleCacheInfo only has a hash if it is cache eligible");
+    Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
+    return finalizedHashString_;
+  }
+
+  public String getHashTrace() {
+    Preconditions.checkState(isEligible(),
+        "TupleCacheInfo only has a hash trace if it is cache eligible");
+    Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
+    return finalizedHashTrace_;
+  }
+
+  /**
+   * Finish accumulating information and calculate the final hash value and
+   * hash trace. This must be called before accessing the hash or hash trace.
+   * No further modifications can be made after calling finalize().
+   */
+  public void finalize() {
+    finalizedHashString_ = hasher_.hash().toString();
+    hasher_ = null;
+    finalizedHashTrace_ = hashTraceBuilder_.toString();
+    hashTraceBuilder_ = null;
+    finalized_ = true;
+  }
+
+  /**
+   * Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
+   * ineligible, then this is marked ineligible and there is no need to 
calculate
+   * a hash. If the child is eligible, it incorporates the child's hash into 
this
+   * hash.
+   */
+  public void mergeChild(TupleCacheInfo child) {
+    Preconditions.checkState(!finalized_,
+        "TupleCacheInfo is finalized and can't be modified");
+    if (!child.isEligible()) {
+      ineligibilityReasons_.add(IneligibilityReason.CHILDREN_INELIGIBLE);
+    } else {
+      // The child is eligible, so incorporate its hash into our hasher.
+      hasher_.putBytes(child.getHashString().getBytes());
+      // Also, aggregate its hash trace into ours.
+      // TODO: It might be more useful to have the hash trace just for this
+      // node. We could display each node's hash trace in explain plan,
+      // and each contribution would be clear.
+      hashTraceBuilder_.append(child.getHashTrace());
+    }
+  }
+
+  /**
+   * All Thrift objects inherit from TBase, so this function can incorporate 
any Thrift
+   * object into the hash.
+   */
+  public void hashThrift(TBase<?, ?> thriftObj) {
+    Preconditions.checkState(!finalized_,
+        "TupleCacheInfo is finalized and can't be modified");
+    try {
+      TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+      hasher_.putBytes(serializer.serialize(thriftObj));
+    } catch (org.apache.thrift.TException e) {
+      // This should not happen. Having a precondition rather than throwing an 
exception
+      // avoids needing to include the exception in the function specification.
+      Preconditions.checkState(false, "Unexpected Thrift exception: " + 
e.toString());
+    }
+    // All Thrift objects have a toString() function with a human-readable
+    // representation of all fields that have been set. Looking at the 
implementation,
+    // Thrift's toString() function doesn't return null.
+    String thriftString = thriftObj.toString();
+    Preconditions.checkState(thriftString != null);
+    hashTraceBuilder_.append(thriftString);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
new file mode 100644
index 000000000..1ef7f944d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.thrift.TTupleCacheNode;
+import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPlanNode;
+import org.apache.impala.thrift.TPlanNodeType;
+import org.apache.impala.thrift.TQueryOptions;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.HashCode;
+
+/**
+ * Node that serves as a caching intermediary with its child node. If the 
cache contains
+ * a matching entry, this node serves up tuples from the cache and prevents 
its child
+ * from executing. If the cache does not contain a matching entry, this 
executes its child
+ * and mirrors the tuples into the cache.
+ */
+public class TupleCacheNode extends PlanNode {
+
+  protected String subtreeHash_;
+  protected String hashTrace_;
+
+  public TupleCacheNode(PlanNodeId id, PlanNode child) {
+    super(id, "TUPLE CACHE");
+    addChild(child);
+    cardinality_ = child.getCardinality();
+    limit_ = child.limit_;
+
+    TupleCacheInfo childCacheInfo = child.getTupleCacheInfo();
+    Preconditions.checkState(childCacheInfo.isEligible());
+    subtreeHash_ = childCacheInfo.getHashString();
+    hashTrace_ = childCacheInfo.getHashTrace();
+  }
+
+  @Override
+  public void init(Analyzer analyzer) throws ImpalaException {
+    super.init(analyzer);
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tblRefIds_.addAll(getChild(0).getTblRefIds());
+    tupleIds_.addAll(getChild(0).getTupleIds());
+    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
+  }
+
+  @Override
+  protected void toThrift(TPlanNode msg) {
+    msg.node_type = TPlanNodeType.TUPLE_CACHE_NODE;
+    // TupleCacheNode does not enforce limits itself. It returns its child's 
output
+    // unchanged and it expects other parts of the plan to enforce limits,
+    // Assert that there is no limit placed on this node.
+    Preconditions.checkState(!hasLimit(),
+        "TupleCacheNode does not enforce limits itself and cannot have a limit 
set.");
+    TTupleCacheNode tupleCacheNode = new TTupleCacheNode();
+    tupleCacheNode.setSubtree_hash(subtreeHash_);
+    msg.setTuple_cache_node(tupleCacheNode);
+  }
+
+  @Override
+  public void computeNodeResourceProfile(TQueryOptions queryOptions) {
+    Preconditions.checkNotNull(
+        fragment_, "PlanNode must be placed into a fragment before calling 
this method.");
+    long perInstanceMemEstimate = 0;
+
+    long bufferSize = computeMaxSpillableBufferSize(
+        queryOptions.getDefault_spillable_buffer_size(), 
queryOptions.getMax_row_size());
+
+    // Adequate space to serialize/deserialize a RowBatch.
+    long perInstanceMinMemReservation = 2 * bufferSize;
+    nodeResourceProfile_ = new ResourceProfileBuilder()
+        .setMemEstimateBytes(perInstanceMemEstimate)
+        .setMinMemReservationBytes(perInstanceMinMemReservation)
+        
.setSpillableBufferBytes(bufferSize).setMaxRowBufferBytes(bufferSize).build();
+  }
+
+  @Override
+  protected String getNodeExplainString(String prefix, String detailPrefix,
+      TExplainLevel detailLevel) {
+    StringBuilder output = new StringBuilder();
+    output.append(String.format("%s%s:%s\n", prefix, id_.toString(), 
displayName_));
+    output.append(detailPrefix + "cache key: " + subtreeHash_ + "\n");
+
+    // For debuggability, always print the hash trace until the cache key 
calculation
+    // matures. Print trace in chunks to avoid excessive wrapping and padding 
in
+    // impala-shell. There are other explain lines at VERBOSE level that are
+    // over 100 chars long so we limit the key chunk length similarly here.
+    final int keyFormatWidth = 100;
+    for(int idx = 0; idx < hashTrace_.length(); idx += keyFormatWidth) {
+      int stop_idx = Math.min(hashTrace_.length(), idx + keyFormatWidth);
+      output.append(detailPrefix + "[" + hashTrace_.substring(idx, stop_idx) + 
"]\n");
+    }
+    return output.toString();
+  }
+
+  public String getSubtreeHash() { return subtreeHash_; }
+
+  @Override
+  public void computeProcessingCost(TQueryOptions queryOptions) {
+    processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), 
getCardinality(), 0);
+  }
+
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
new file mode 100644
index 000000000..ebc049a8f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCachePlanner.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.List;
+
+import org.apache.impala.common.ImpalaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The TupleCachePlanner adds TupleCacheNodes to an existing distributed plan 
tree.
+ * Calculating eligibility and cache keys for locations in the plan tree 
requires that
+ * the plan tree be in a stable form that won't later change. That means that 
this is
+ * designed to run as the last step in planning.
+ *
+ * The current algorithm is to add a TupleCacheNode at every eligible 
location. This will
+ * need to be refined with cost calculations later.
+ */
+public class TupleCachePlanner {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TupleCachePlanner.class);
+
+  private final PlannerContext ctx_;
+
+  public TupleCachePlanner(PlannerContext ctx) {
+    ctx_ = ctx;
+  }
+
+  /**
+   * This takes an existing distributed plan, computes the eligibility and 
cache keys,
+   * then adds TupleCacheNodes at eligible locations.
+   */
+  public List<PlanFragment> createPlans(List<PlanFragment> plan) throws 
ImpalaException {
+
+    // Start at the root of the PlanNode tree
+    PlanNode root = plan.get(0).getPlanRoot();
+    // Step 1: Compute the TupleCacheInfo for all PlanNodes
+    root.computeTupleCacheInfo();
+
+    // Step 2: Build up the new PlanNode tree with TupleCacheNodes added
+    PlanNode newRoot = buildCachingPlan(root);
+    // Since buildCachingPlan is modifying things in place, verify that the 
top-most plan
+    // fragment's plan root matches with the newRoot returned.
+    Preconditions.checkState(plan.get(0).getPlanRoot() == newRoot);
+
+    // We may add some extra PlanNodes in the tree, but nothing we do will 
impact the
+    // number of fragments or the shape of the plan. We are modifying the 
PlanFragments
+    // in place and can just return the same list.
+    return plan;
+  }
+
+  /**
+   * Add TupleCacheNodes at every eligible location via a bottom-up traversal 
of the tree.
+   */
+  private PlanNode buildCachingPlan(PlanNode node) throws ImpalaException {
+    // Recurse through the children applying the caching policy
+    for (int i = 0; i < node.getChildCount(); i++) {
+      node.setChild(i, buildCachingPlan(node.getChild(i)));
+    }
+
+    // If this node is not eligible, then we are done
+    if (!node.getTupleCacheInfo().isEligible()) {
+      return node;
+    }
+
+    // Should we cache above this node?
+    // Simplest policy: always cache if eligible
+    // TODO: Make this more complicated (e.g. cost calculations)
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Adding TupleCacheNode above node " + node.getId().toString());
+    }
+    // Allocate TupleCacheNode
+    TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(), 
node);
+    tupleCacheNode.init(ctx_.getRootAnalyzer());
+    PlanFragment curFragment = node.getFragment();
+    if (node == curFragment.getPlanRoot()) {
+      // If this is the top of a fragment, update the fragment plan root
+      curFragment.addPlanRoot(tupleCacheNode);
+      return tupleCacheNode;
+    } else {
+      // If this is not the top of a fragment, then set the fragment
+      tupleCacheNode.setFragment(curFragment);
+      return tupleCacheNode;
+    }
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
new file mode 100644
index 000000000..82e5827a9
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheInfoTest.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.impala.thrift.TUniqueId;
+
+import org.junit.Test;
+
+/**
+ * Basic unit tests for TupleCacheInfo
+ */
+public class TupleCacheInfoTest {
+
+  @Test
+  public void testHashThrift() {
+    TupleCacheInfo info1 = new TupleCacheInfo();
+    info1.hashThrift(new TUniqueId(1L, 2L));
+    info1.finalize();
+
+    TupleCacheInfo info2 = new TupleCacheInfo();
+    info2.hashThrift(new TUniqueId(1L, 2L));
+    info2.finalize();
+
+    assertEquals(info1.getHashTrace(), "TUniqueId(hi:1, lo:2)");
+    assertEquals(info1.getHashTrace(), info2.getHashTrace());
+    // Hashes are stable over time, so check the actual hash value
+    assertEquals(info1.getHashString(), "b3f5384f81770c6adb83209b2a171dfa");
+    assertEquals(info1.getHashString(), info2.getHashString());
+  }
+
+  @Test
+  public void testMergeHash() {
+    TupleCacheInfo child1 = new TupleCacheInfo();
+    child1.hashThrift(new TUniqueId(1L, 2L));
+    child1.finalize();
+
+    TupleCacheInfo child2 = new TupleCacheInfo();
+    child2.hashThrift(new TUniqueId(3L, 4L));
+    child2.finalize();
+
+    TupleCacheInfo parent = new TupleCacheInfo();
+    parent.mergeChild(child1);
+    parent.mergeChild(child2);
+    parent.hashThrift(new TUniqueId(5L, 6L));
+    parent.finalize();
+
+    assertEquals(parent.getHashTrace(),
+        "TUniqueId(hi:1, lo:2)TUniqueId(hi:3, lo:4)TUniqueId(hi:5, lo:6)");
+    // Hashes are stable over time, so check the actual hash value
+    assertEquals(parent.getHashString(), "edf5633bed2280c3c3edb703182f3122");
+  }
+
+  @Test
+  public void testMergeEligibility() {
+    // Child 1 is eligible
+    TupleCacheInfo child1 = new TupleCacheInfo();
+    child1.hashThrift(new TUniqueId(1L, 2L));
+    child1.finalize();
+    assertTrue(child1.isEligible());
+
+    // Child 2 is ineligible
+    TupleCacheInfo child2 = new TupleCacheInfo();
+    child2.setIneligible(TupleCacheInfo.IneligibilityReason.NOT_IMPLEMENTED);
+    child2.finalize();
+    assertTrue(!child2.isEligible());
+
+    TupleCacheInfo parent = new TupleCacheInfo();
+    parent.mergeChild(child1);
+    // Still eligible after adding child1 without child2
+    assertTrue(parent.isEligible());
+    parent.mergeChild(child2);
+    // It is allowed to check eligibility before finalize()
+    assertTrue(!parent.isEligible());
+    parent.finalize();
+
+    assertTrue(!parent.isEligible());
+  }
+
+}
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
new file mode 100644
index 000000000..91269e74a
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.Frontend.PlanCtx;
+import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.thrift.TQueryCtx;
+import org.apache.impala.thrift.TQueryOptions;
+import org.junit.Test;
+
+/**
+ * Test the planner's tuple cache node keys and eligibility
+ *
+ * This uses test infrastructure adapted from CardinalityTest to run a query 
and produce
+ * the query plan. It then checks for TupleCacheNodes and examines their keys. 
This
+ * relies on TupleCachePlanner placing TupleCacheNodes at all eligible 
locations.
+ */
+public class TupleCacheTest extends PlannerTestBase {
+
+  /**
+   * Test some basic cases for tuple cache keys
+   */
+  @Test
+  public void testTupleCacheKeys() {
+    verifyIdenticalCacheKeys("select id from functional.alltypes",
+        "select id from functional.alltypes");
+    verifyDifferentCacheKeys("select id from functional.alltypes",
+        "select id from functional.alltypestiny");
+    // TODO: incorporate column information into the key
+    // verifyDifferentCacheKeys("select id from functional.alltypes",
+    //     "select int_col from functional.alltypes");
+    verifyDifferentCacheKeys("select id from functional.alltypes",
+        "select id from functional_parquet.alltypes");
+    verifyDifferentCacheKeys("select id from functional.alltypes",
+        "select id from functional.alltypes where id = 1");
+    verifyDifferentCacheKeys("select id from functional.alltypes where id = 1",
+        "select id from functional.alltypes where id = 2");
+
+    // TODO: mask out the table alias so it doesn't result in different cache 
keys
+    // verifyIdenticalCacheKeys("select id from functional.alltypes",
+    //     "select id from functional.alltypes a");
+    verifyIdenticalCacheKeys("select id from functional.alltypes",
+        "select id as a from functional.alltypes");
+
+    // Tuple caching not implemented for Kudu or HBase
+    verifyCacheIneligible("select id from functional_kudu.alltypes");
+    verifyCacheIneligible("select id from functional_hbase.alltypes");
+
+    // TODO: Random functions should make a location ineligible
+    // rand()/random()/uuid()
+    // verifyCacheIneligible(
+    //     "select id from functional.alltypes where id < 7300 * rand()");
+    // verifyCacheIneligible(
+    //     "select id from functional.alltypes where id < 7300 * random()");
+    // verifyCacheIneligible(
+    //     "select id from functional.alltypes where string_col != uuid()");
+
+    // TODO: Time functions are replaced by constant folding
+    // 
now()/current_date()/current_timestamp()/unix_timestamp()/utc_timestamp()/etc.
+    // verifyCacheIneligible(
+    //     "select timestamp_col from functional.alltypes where timestamp_col 
< now()");
+    // verifyCacheIneligible(
+    //     "select date_col from functional.date_tbl where date_col < 
current_date()");
+  }
+
+  protected List<PlanNode> getCacheEligibleNodes(String query) {
+    List<PlanFragment> plan = getPlan(query);
+    PlanNode planRoot = plan.get(0).getPlanRoot();
+    // Walk over the plan and produce a list of cache keys.
+    List<PlanNode> preOrderPlanNodes = planRoot.getNodesPreOrder();
+    List<PlanNode> cacheEligibleNodes = new ArrayList<PlanNode>();
+    for (PlanNode node : preOrderPlanNodes) {
+      if (node instanceof TupleCacheNode) continue;
+      TupleCacheInfo info = node.getTupleCacheInfo();
+      if (info.isEligible()) {
+        cacheEligibleNodes.add(node);
+      }
+    }
+
+    return cacheEligibleNodes;
+  }
+
+  private List<String> getCacheKeys(List<PlanNode> cacheEligibleNodes) {
+    List<String> cacheKeys = new ArrayList<String>();
+    for (PlanNode node : cacheEligibleNodes) {
+      cacheKeys.add(node.getTupleCacheInfo().getHashString());
+    }
+    return cacheKeys;
+  }
+
+  private List<String> getCacheHashTraces(List<PlanNode> cacheEligibleNodes) {
+    List<String> cacheHashTraces = new ArrayList<String>();
+    for (PlanNode node : cacheEligibleNodes) {
+      cacheHashTraces.add(node.getTupleCacheInfo().getHashTrace());
+    }
+    return cacheHashTraces;
+  }
+
+  private void printCacheEligibleNode(PlanNode node, StringBuilder log) {
+    log.append(node.getDisplayLabel());
+    log.append("\n");
+    log.append("cache key: ");
+    log.append(node.getTupleCacheInfo().getHashString());
+    log.append("\n");
+    log.append("cache key hash trace: ");
+    log.append(node.getTupleCacheInfo().getHashTrace());
+    log.append("\n");
+  }
+
+  private void printQueryCacheEligibleNodes(String query,
+      List<PlanNode> cacheEligibleNodes, StringBuilder log) {
+    log.append("Query: ");
+    log.append(query);
+    log.append("\n");
+    for (PlanNode node : cacheEligibleNodes) {
+      printCacheEligibleNode(node, log);
+    }
+  }
+
+  protected void verifyCacheIneligible(String query) {
+    List<PlanNode> cacheEligibleNodes = getCacheEligibleNodes(query);
+
+    // No eligible locations
+    if (cacheEligibleNodes.size() != 0) {
+      StringBuilder errorLog = new StringBuilder();
+      errorLog.append("Expected no cache eligible nodes. Instead found:\n");
+      printQueryCacheEligibleNodes(query, cacheEligibleNodes, errorLog);
+      fail(errorLog.toString());
+    }
+  }
+
+  protected void verifyIdenticalCacheKeys(String query1, String query2) {
+    List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
+    List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+    assertTrue(cacheEligibleNodes1.size() > 0);
+    List<String> cacheKeys1 = getCacheKeys(cacheEligibleNodes1);
+    List<String> cacheKeys2 = getCacheKeys(cacheEligibleNodes2);
+    List<String> cacheHashTraces1 = getCacheHashTraces(cacheEligibleNodes1);
+    List<String> cacheHashTraces2 = getCacheHashTraces(cacheEligibleNodes2);
+    if (!cacheKeys1.equals(cacheKeys2) || 
!cacheHashTraces1.equals(cacheHashTraces2)) {
+      StringBuilder errorLog = new StringBuilder();
+      errorLog.append("Expected identical cache keys. Instead found:\n");
+      printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
+      printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+      fail(errorLog.toString());
+    }
+  }
+
+  protected void verifyOverlappingCacheKeys(String query1, String query2) {
+    List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
+    List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+
+    // None of this makes any sense unless they both have eligible nodes
+    assertTrue(cacheEligibleNodes1.size() > 0);
+    assertTrue(cacheEligibleNodes2.size() > 0);
+
+    Set<String> cacheKeys1 = new HashSet(getCacheKeys(cacheEligibleNodes1));
+    Set<String> cacheKeys2 = new HashSet(getCacheKeys(cacheEligibleNodes2));
+    Set<String> keyIntersection = new HashSet(cacheKeys1);
+    keyIntersection.retainAll(cacheKeys2);
+
+    Set<String> cacheHashTraces1 = new 
HashSet(getCacheHashTraces(cacheEligibleNodes1));
+    Set<String> cacheHashTraces2 = new 
HashSet(getCacheHashTraces(cacheEligibleNodes2));
+    Set<String> hashTraceIntersection = new HashSet(cacheHashTraces1);
+    hashTraceIntersection.retainAll(cacheHashTraces2);
+
+    // The hash trace for a cache key should be a one-to-one thing, so
+    // any difference in keys should be a difference in hash traces.
+    assertEquals(keyIntersection.size(), hashTraceIntersection.size());
+
+    if (keyIntersection.size() == 0 || hashTraceIntersection.size() == 0) {
+      StringBuilder errorLog = new StringBuilder();
+      errorLog.append("Expected overlapping cache keys. Instead found:\n");
+      printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
+      printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+      fail(errorLog.toString());
+    }
+  }
+
+  protected void verifyDifferentCacheKeys(String query1, String query2) {
+    List<PlanNode> cacheEligibleNodes1 = getCacheEligibleNodes(query1);
+    List<PlanNode> cacheEligibleNodes2 = getCacheEligibleNodes(query2);
+
+    // None of this makes any sense unless they both have eligible nodes
+    assertTrue(cacheEligibleNodes1.size() > 0);
+    assertTrue(cacheEligibleNodes2.size() > 0);
+    Set<String> cacheKeys1 = new HashSet(getCacheKeys(cacheEligibleNodes1));
+    Set<String> cacheKeys2 = new HashSet(getCacheKeys(cacheEligibleNodes2));
+    Set<String> keyIntersection = new HashSet(cacheKeys1);
+    keyIntersection.retainAll(cacheKeys2);
+
+    Set<String> cacheHashTraces1 = new 
HashSet(getCacheHashTraces(cacheEligibleNodes1));
+    Set<String> cacheHashTraces2 = new 
HashSet(getCacheHashTraces(cacheEligibleNodes2));
+    Set<String> hashTraceIntersection = new HashSet(cacheHashTraces1);
+    hashTraceIntersection.retainAll(cacheHashTraces2);
+
+    // The hash trace for a cache key should be a one-to-one thing, so
+    // any difference in keys should be a difference in hash traces.
+    assertEquals(keyIntersection.size(), hashTraceIntersection.size());
+
+    if (keyIntersection.size() != 0 || hashTraceIntersection.size() != 0) {
+      StringBuilder errorLog = new StringBuilder();
+      errorLog.append("Expected different cache keys. Instead found:\n");
+      printQueryCacheEligibleNodes(query1, cacheEligibleNodes1, errorLog);
+      printQueryCacheEligibleNodes(query2, cacheEligibleNodes2, errorLog);
+      fail(errorLog.toString());
+    }
+  }
+
+  /**
+   * Given a query, run the planner and extract the physical plan prior
+   * to conversion to Thrift. Extract the first (or, more typically, only
+   * fragment) and return it for inspection. This currently runs with 
num_nodes=1
+   * and tuple caching enabled. This was adapted from CardinalityTest.
+   *
+   * @param query the query to run
+   * @return the first (or only) fragment plan node
+   */
+  private List<PlanFragment> getPlan(String query) {
+    // Create a query context with rewrites disabled
+    // TODO: Should probably turn them on, or run a test
+    // both with and without rewrites.
+    TQueryCtx queryCtx = TestUtils.createQueryContext(
+        "default", System.getProperty("user.name"));
+    queryCtx.client_request.setStmt(query);
+
+    // Force the plan to run on a single node so it
+    // resides in a single fragment.
+    TQueryOptions queryOptions = queryCtx.client_request.getQuery_options();
+    queryOptions.setNum_nodes(1);
+    // Turn on tuple caching
+    queryOptions.setEnable_tuple_cache(true);
+
+    // Plan the query, discard the actual execution plan, and
+    // return the plan tree.
+    PlanCtx planCtx = new PlanCtx(queryCtx);
+    planCtx.requestPlanCapture();
+    try {
+      frontend_.createExecRequest(planCtx);
+    } catch (ImpalaException e) {
+      fail(e.getMessage());
+    }
+    return planCtx.getPlan();
+  }
+}
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test 
b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
index 26d4aef9d..cb006613b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level1.test
@@ -46,4 +46,19 @@ explain select year, month from functional.alltypes
 select year, month from functional.alltypes where year=2009;
 ---- RESULTS: VERIFY_IS_SUBSET
 '04:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]'
-====
\ No newline at end of file
+====
+---- QUERY
+# Tests for no cache key without ENABLE_TUPLE_CACHE
+explain select count(*) from tpch.region
+---- RESULTS: VERIFY_IS_NOT_IN
+row_regex:.* cache key: [0-9a-f][0-9a-f]*.*
+row_regex:.*\[TPlanNode\(.*\]
+====
+---- QUERY
+# Tests for cache key and no trace with ENABLE_TUPLE_CACHE
+set ENABLE_TUPLE_CACHE=TRUE;
+explain select count(*) from tpch.region
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* cache key: [0-9a-f][0-9a-f]*.*
+row_regex:.*\[TPlanNode\(.*\]
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test 
b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index 8b72f4a69..8b6024608 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -94,3 +94,11 @@ select year, month from functional.alltypes where year=2009;
 'Per-Host Resources: mem-estimate=13.98MB mem-reservation=5.88MB 
thread-reservation=1 runtime-filters-memory=2.00MB'
 '04:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]'
 ====
+---- QUERY
+# Tests for cache key and trace with ENABLE_TUPLE_CACHE=TRUE
+set ENABLE_TUPLE_CACHE=TRUE;
+explain select count(*) from tpch.region
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:.* cache key: [0-9a-f][0-9a-f]*.*
+row_regex:.*\[TPlanNode\(.*\]
+====

Reply via email to