http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index aba4a26..5269fe5 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -26,22 +26,23 @@ #include "common/logging.h" #include "common/object-pool.h" #include "exec/data-sink.h" -#include "exec/exec-node.h" #include "exec/exchange-node.h" -#include "exec/scan-node.h" -#include "exec/hdfs-scan-node.h" +#include "exec/exec-node.h" #include "exec/hbase-table-scanner.h" +#include "exec/hdfs-scan-node.h" +#include "exec/plan-root-sink.h" +#include "exec/scan-node.h" #include "exprs/expr.h" -#include "runtime/descriptors.h" #include "runtime/data-stream-mgr.h" +#include "runtime/descriptors.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter-bank.h" -#include "runtime/mem-tracker.h" +#include "util/container-util.h" #include "util/cpu-info.h" #include "util/debug-util.h" -#include "util/container-util.h" -#include "util/parse-util.h" #include "util/mem-info.h" +#include "util/parse-util.h" #include "util/periodic-counter-updater.h" #include "util/pretty-printer.h" @@ -60,28 +61,45 @@ namespace impala { const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage"; -PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, - const ReportStatusCallback& report_status_cb) : - exec_env_(exec_env), plan_(NULL), report_status_cb_(report_status_cb), - report_thread_active_(false), done_(false), closed_(false), - has_thread_token_(false), is_prepared_(false), is_cancelled_(false), - average_thread_tokens_(NULL), mem_usage_sampled_counter_(NULL), - thread_usage_sampled_counter_(NULL) { -} +PlanFragmentExecutor::PlanFragmentExecutor( + ExecEnv* exec_env, const ReportStatusCallback& report_status_cb) + : exec_env_(exec_env), + exec_tree_(NULL), + report_status_cb_(report_status_cb), + report_thread_active_(false), + closed_(false), + has_thread_token_(false), + is_prepared_(false), + is_cancelled_(false), + average_thread_tokens_(NULL), + mem_usage_sampled_counter_(NULL), + thread_usage_sampled_counter_(NULL) {} PlanFragmentExecutor::~PlanFragmentExecutor() { - Close(); + DCHECK(!is_prepared_ || closed_); // at this point, the report thread should have been stopped DCHECK(!report_thread_active_); } Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { + Status status = PrepareInternal(request); + prepared_promise_.Set(status); + return status; +} + +Status PlanFragmentExecutor::WaitForOpen() { + DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()"; + RETURN_IF_ERROR(prepared_promise_.Get()); + return opened_promise_.Get(); +} + +Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) { lock_guard<mutex> l(prepare_lock_); DCHECK(!is_prepared_); if (is_cancelled_) return Status::CANCELLED; - is_prepared_ = true; + // TODO: Break this method up. fragment_sw_.Start(); const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx; @@ -100,6 +118,10 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { // total_time_counter() is in the runtime_state_ so start it up now. SCOPED_TIMER(profile()->total_time_counter()); + timings_profile_ = + obj_pool()->Add(new RuntimeProfile(obj_pool(), "PlanFragmentExecutor")); + profile()->AddChild(timings_profile_); + SCOPED_TIMER(ADD_TIMER(timings_profile_, "PrepareTime")); // reservation or a query option. int64_t bytes_limit = -1; @@ -145,22 +167,22 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { // set up plan DCHECK(request.__isset.fragment_ctx); - RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(), - request.fragment_ctx.fragment.plan, *desc_tbl, &plan_)); - runtime_state_->set_fragment_root_id(plan_->id()); + RETURN_IF_ERROR(ExecNode::CreateTree( + runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_)); + runtime_state_->set_fragment_root_id(exec_tree_->id()); if (fragment_instance_ctx.__isset.debug_node_id) { DCHECK(fragment_instance_ctx.__isset.debug_action); DCHECK(fragment_instance_ctx.__isset.debug_phase); ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id, - fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, plan_); + fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, + exec_tree_); } // set #senders of exchange nodes before calling Prepare() vector<ExecNode*> exch_nodes; - plan_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); - for (ExecNode* exch_node: exch_nodes) - { + exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); + for (ExecNode* exch_node : exch_nodes) { DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders, exch_node->id(), 0); @@ -171,7 +193,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { // set scan ranges vector<ExecNode*> scan_nodes; vector<TScanRangeParams> no_scan_ranges; - plan_->CollectScanNodes(&scan_nodes); + exec_tree_->CollectScanNodes(&scan_nodes); for (int i = 0; i < scan_nodes.size(); ++i) { ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]); const vector<TScanRangeParams>& scan_ranges = FindWithDefault( @@ -179,42 +201,47 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { scan_node->SetScanRanges(scan_ranges); } - RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "PrepareTime"); + RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "ExecTreePrepareTime"); { SCOPED_TIMER(prepare_timer); - RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get())); + RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_.get())); } PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges); - // set up sink, if required - if (request.fragment_ctx.fragment.__isset.output_sink) { - RETURN_IF_ERROR(DataSink::CreateDataSink( - obj_pool(), request.fragment_ctx.fragment.output_sink, - request.fragment_ctx.fragment.output_exprs, - fragment_instance_ctx, row_desc(), &sink_)); - sink_mem_tracker_.reset(new MemTracker( - -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true)); - RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get())); - - RuntimeProfile* sink_profile = sink_->profile(); - if (sink_profile != NULL) { - profile()->AddChild(sink_profile); - } - } else { - sink_.reset(NULL); + DCHECK(request.fragment_ctx.fragment.__isset.output_sink); + RETURN_IF_ERROR( + DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink, + request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx, + exec_tree_->row_desc(), &sink_)); + sink_mem_tracker_.reset( + new MemTracker(-1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true)); + RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get())); + + RuntimeProfile* sink_profile = sink_->profile(); + if (sink_profile != NULL) { + profile()->AddChild(sink_profile); + } + + if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) { + root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get()); + // Release the thread token on the root fragment instance. This fragment spends most + // of the time waiting and doing very little work. Holding on to the token causes + // underutilization of the machine. If there are 12 queries on this node, that's 12 + // tokens reserved for no reason. + ReleaseThreadToken(); } // set up profile counters - profile()->AddChild(plan_->runtime_profile()); + profile()->AddChild(exec_tree_->runtime_profile()); rows_produced_counter_ = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT); per_host_mem_usage_ = ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES); - row_batch_.reset(new RowBatch(plan_->row_desc(), runtime_state_->batch_size(), - runtime_state_->instance_mem_tracker())); - VLOG(2) << "plan_root=\n" << plan_->DebugString(); + row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(), + runtime_state_->instance_mem_tracker())); + VLOG(2) << "plan_root=\n" << exec_tree_->DebugString(); return Status::OK(); } @@ -251,12 +278,21 @@ void PlanFragmentExecutor::PrintVolumeIds( } Status PlanFragmentExecutor::Open() { - VLOG_QUERY << "Open(): instance_id=" - << runtime_state_->fragment_instance_id(); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(ADD_TIMER(timings_profile_, "OpenTime")); + VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id(); + Status status = OpenInternal(); + UpdateStatus(status); + opened_promise_.Set(status); + return status; +} - RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get())); +Status PlanFragmentExecutor::OpenInternal() { + RETURN_IF_ERROR( + runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get())); - // we need to start the profile-reporting thread before calling Open(), since it + // we need to start the profile-reporting thread before calling exec_tree_->Open(), + // since it // may block if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) { unique_lock<mutex> l(report_thread_lock_); @@ -271,22 +307,25 @@ Status PlanFragmentExecutor::Open() { OptimizeLlvmModule(); - Status status = OpenInternal(); - if (sink_.get() != NULL) { - // We call Close() here rather than in OpenInternal() because we want to make sure - // that Close() gets called even if there was an error in OpenInternal(). - // We also want to call sink_->Close() here rather than in PlanFragmentExecutor::Close - // because we do not want the sink_ to hold on to all its resources as we will never - // use it after this. - sink_->Close(runtime_state()); - // If there's a sink and no error, OpenInternal() completed the fragment execution. - if (status.ok()) { - done_ = true; - FragmentComplete(); - } + { + SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTreeOpenTime")); + RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get())); } + return sink_->Open(runtime_state_.get()); +} - if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) { +Status PlanFragmentExecutor::Exec() { + SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTime")); + { + lock_guard<mutex> l(status_lock_); + RETURN_IF_ERROR(status_); + } + Status status = ExecInternal(); + + // If there's no error, ExecInternal() completed the fragment instance's execution. + if (status.ok()) { + FragmentComplete(); + } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) { // Log error message in addition to returning in Status. Queries that do not // fetch results (e.g. insert) may not receive the message directly and can // only retrieve the log. @@ -296,21 +335,23 @@ Status PlanFragmentExecutor::Open() { return status; } -Status PlanFragmentExecutor::OpenInternal() { - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(plan_->Open(runtime_state_.get())); - if (sink_.get() == NULL) return Status::OK(); - - // If there is a sink, do all the work of driving it here, so that - // when this returns the query has actually finished - RETURN_IF_ERROR(sink_->Open(runtime_state_.get())); - while (!done_) { +Status PlanFragmentExecutor::ExecInternal() { + RuntimeProfile::Counter* plan_exec_timer = + ADD_TIMER(timings_profile_, "ExecTreeExecTime"); + bool exec_tree_complete = false; + do { + Status status; row_batch_->Reset(); - RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_)); - if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::OpenInternal()"); + { + SCOPED_TIMER(plan_exec_timer); + status = exec_tree_->GetNext( + runtime_state_.get(), row_batch_.get(), &exec_tree_complete); + } + if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()"); COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); + RETURN_IF_ERROR(status); RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get())); - } + } while (!exec_tree_complete); // Flush the sink *before* stopping the report thread. Flush may need to add some // important information to the last report that gets sent. (e.g. table sinks record the @@ -376,13 +417,20 @@ void PlanFragmentExecutor::SendReport(bool done) { status = status_; } + // If status is not OK, we need to make sure that only one sender sends a 'done' + // response. + // TODO: Clean all this up - move 'done' reporting to Close()? + if (!done && !status.ok()) { + done = completed_report_sent_.CompareAndSwap(0, 1); + } + // Update the counter for the peak per host mem usage. per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption()); // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - report_status_cb_(status, profile(), done || !status.ok()); + report_status_cb_(status, profile(), done); } void PlanFragmentExecutor::StopReportThread() { @@ -395,36 +443,6 @@ void PlanFragmentExecutor::StopReportThread() { report_thread_->Join(); } -Status PlanFragmentExecutor::GetNext(RowBatch** batch) { - SCOPED_TIMER(profile()->total_time_counter()); - VLOG_FILE << "GetNext(): instance_id=" << runtime_state_->fragment_instance_id(); - - Status status = Status::OK(); - row_batch_->Reset(); - // Loop until we've got a non-empty batch, hit an error or exhausted the input. - while (!done_) { - status = plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_); - if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::GetNext()"); - if (!status.ok()) break; - if (row_batch_->num_rows() > 0) break; - row_batch_->Reset(); - } - UpdateStatus(status); - COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); - - if (done_) { - VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_) - << " instance_id=" << PrintId(runtime_state_->fragment_instance_id()); - FragmentComplete(); - // Once all rows are returned, signal that we're done with an empty batch. - *batch = row_batch_->num_rows() == 0 ? NULL : row_batch_.get(); - return status; - } - - *batch = row_batch_.get(); - return status; -} - void PlanFragmentExecutor::FragmentComplete() { // Check the atomic flag. If it is set, then a fragment complete report has already // been sent. @@ -463,7 +481,7 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) { } void PlanFragmentExecutor::Cancel() { - VLOG_QUERY << "Cancelling plan fragment..."; + VLOG_QUERY << "Cancelling fragment instance..."; lock_guard<mutex> l(prepare_lock_); is_cancelled_ = true; if (!is_prepared_) { @@ -476,18 +494,10 @@ void PlanFragmentExecutor::Cancel() { runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id()); } -const RowDescriptor& PlanFragmentExecutor::row_desc() { - return plan_->row_desc(); -} - RuntimeProfile* PlanFragmentExecutor::profile() { return runtime_state_->runtime_profile(); } -bool PlanFragmentExecutor::ReachedLimit() { - return plan_->ReachedLimit(); -} - void PlanFragmentExecutor::ReleaseThreadToken() { if (has_thread_token_) { has_thread_token_ = false; @@ -500,19 +510,23 @@ void PlanFragmentExecutor::ReleaseThreadToken() { void PlanFragmentExecutor::Close() { if (closed_) return; + if (!is_prepared_) return; + if (sink_.get() != nullptr) sink_->Close(runtime_state()); + row_batch_.reset(); if (sink_mem_tracker_ != NULL) { sink_mem_tracker_->UnregisterFromParent(); sink_mem_tracker_.reset(); } - // Prepare may not have been called, which sets runtime_state_ - if (runtime_state_.get() != NULL) { - if (plan_ != NULL) plan_->Close(runtime_state_.get()); - runtime_state_->UnregisterReaderContexts(); - exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); - runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get()); - runtime_state_->filter_bank()->Close(); - } + + // Prepare should always have been called, and so runtime_state_ should be set + DCHECK(prepared_promise_.IsSet()); + if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get()); + runtime_state_->UnregisterReaderContexts(); + exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); + runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get()); + runtime_state_->filter_bank()->Close(); + if (mem_usage_sampled_counter_ != NULL) { PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_); mem_usage_sampled_counter_ = NULL;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h index f4355ea..82d3001 100644 --- a/be/src/runtime/plan-fragment-executor.h +++ b/be/src/runtime/plan-fragment-executor.h @@ -23,9 +23,10 @@ #include <boost/scoped_ptr.hpp> #include <boost/function.hpp> -#include "common/status.h" #include "common/object-pool.h" +#include "common/status.h" #include "runtime/runtime-state.h" +#include "util/promise.h" #include "util/runtime-profile-counters.h" #include "util/thread.h" @@ -33,6 +34,7 @@ namespace impala { class HdfsFsCache; class ExecNode; +class PlanRootSink; class RowDescriptor; class RowBatch; class DataSink; @@ -45,21 +47,28 @@ class TPlanFragment; class TPlanExecParams; /// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment, -/// including setup and tear-down, both in the success and error case. -/// Tear-down frees all memory allocated for this plan fragment and closes all data -/// streams; it happens automatically in the d'tor. -// -/// The executor makes an aggregated profile for the entire fragment available, -/// which includes profile information for the plan itself as well as the output -/// sink, if any. +/// including setup and tear-down, both in the success and error case. Tear-down, which +/// happens in Close(), frees all memory allocated for this plan fragment and closes all +/// data streams. +/// +/// The lifecycle of a PlanFragmentExecutor is as follows: +/// if (Prepare().ok()) { +/// Open() +/// Exec() +/// } +/// Close() +/// +/// The executor makes an aggregated profile for the entire fragment available, which +/// includes profile information for the plan itself as well as the output sink. +/// /// The ReportStatusCallback passed into the c'tor is invoked periodically to report the /// execution status. The frequency of those reports is controlled by the flag /// status_report_interval; setting that flag to 0 disables periodic reporting altogether -/// Regardless of the value of that flag, if a report callback is specified, it is -/// invoked at least once at the end of execution with an overall status and profile -/// (and 'done' indicator). The only exception is when execution is cancelled, in which -/// case the callback is *not* invoked (the coordinator already knows that execution -/// stopped, because it initiated the cancellation). +/// Regardless of the value of that flag, if a report callback is specified, it is invoked +/// at least once at the end of execution with an overall status and profile (and 'done' +/// indicator). The only exception is when execution is cancelled, in which case the +/// callback is *not* invoked (the coordinator already knows that execution stopped, +/// because it initiated the cancellation). // /// Aside from Cancel(), which may be called asynchronously, this class is not /// thread-safe. @@ -76,49 +85,37 @@ class PlanFragmentExecutor { ReportStatusCallback; /// report_status_cb, if !empty(), is used to report the accumulated profile - /// information periodically during execution (Open() or GetNext()). + /// information periodically during execution. PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb); - /// Closes the underlying plan fragment and frees up all resources allocated - /// in Open()/GetNext(). - /// It is an error to delete a PlanFragmentExecutor with a report callback - /// before Open()/GetNext() (depending on whether the fragment has a sink) - /// indicated that execution is finished. + /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec() + /// indicated that execution is finished, or to delete one that has not been Close()'d + /// if Prepare() has been called. ~PlanFragmentExecutor(); /// Prepare for execution. Call this prior to Open(). /// - /// runtime_state() and row_desc() will not be valid until Prepare() is - /// called. runtime_state() will always be valid after Prepare() returns, unless the - /// query was cancelled before Prepare() was called. If request.query_options.mem_limit - /// > 0, it is used as an approximate limit on the number of bytes this query can - /// consume at runtime. The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over - /// that limit. + /// runtime_state() will not be valid until Prepare() is called. runtime_state() will + /// always be valid after Prepare() returns, unless the query was cancelled before + /// Prepare() was called. If request.query_options.mem_limit > 0, it is used as an + /// approximate limit on the number of bytes this query can consume at runtime. The + /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit. /// /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns /// Status::CANCELLED; Status Prepare(const TExecPlanFragmentParams& request); - /// Start execution. Call this prior to GetNext(). - /// If this fragment has a sink, Open() will send all rows produced - /// by the fragment to that sink. Therefore, Open() may block until - /// all rows are produced (and a subsequent call to GetNext() will not return - /// any rows). - /// This also starts the status-reporting thread, if the interval flag - /// is > 0 and a callback was specified in the c'tor. - /// If this fragment has a sink, report_status_cb will have been called for the final - /// time when Open() returns, and the status-reporting thread will have been stopped. + /// Opens the fragment plan and sink. Starts the profile reporting thread, if required. Status Open(); - /// Return results through 'batch'. Sets '*batch' to NULL if no more results. - /// '*batch' is owned by PlanFragmentExecutor and must not be deleted. - /// When *batch == NULL, GetNext() should not be called anymore. Also, report_status_cb - /// will have been called for the final time and the status-reporting thread - /// will have been stopped. - Status GetNext(RowBatch** batch); + /// Executes the fragment by repeatedly driving the sink with batches produced by the + /// exec node tree. report_status_cb will have been called for the final time when + /// Exec() returns, and the status-reporting thread will have been stopped. + Status Exec(); - /// Closes the underlying plan fragment and frees up all resources allocated - /// in Open()/GetNext(). + /// Closes the underlying plan fragment and frees up all resources allocated in + /// Prepare() and Open(). Must be called if Prepare() has been called - no matter + /// whether or not Prepare() succeeded. void Close(); /// Initiate cancellation. If called concurrently with Prepare(), will wait for @@ -131,25 +128,30 @@ class PlanFragmentExecutor { /// It is legal to call Cancel() if Prepare() returned an error. void Cancel(); - /// Returns true if this query has a limit and it has been reached. - bool ReachedLimit(); - - /// Releases the thread token for this fragment executor. - void ReleaseThreadToken(); - /// call these only after Prepare() RuntimeState* runtime_state() { return runtime_state_.get(); } - const RowDescriptor& row_desc(); /// Profile information for plan and output sink. RuntimeProfile* profile(); + /// Blocks until Prepare() is completed. + Status WaitForPrepare() { return prepared_promise_.Get(); } + + /// Blocks until exec tree and sink are both opened. It is an error to call this before + /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will + /// return that error without blocking. + Status WaitForOpen(); + + /// Returns fragment instance's sink if this is the root fragment instance. Valid after + /// Prepare() returns; if Prepare() fails may be nullptr. + PlanRootSink* root_sink() { return root_sink_; } + /// Name of the counter that is tracking per query, per host peak mem usage. static const std::string PER_HOST_PEAK_MEM_COUNTER; private: ExecEnv* exec_env_; // not owned - ExecNode* plan_; // lives in runtime_state_->obj_pool() + ExecNode* exec_tree_; // lives in runtime_state_->obj_pool() TUniqueId query_id_; /// profile reporting-related @@ -166,9 +168,6 @@ class PlanFragmentExecutor { boost::condition_variable report_thread_started_cv_; bool report_thread_active_; // true if we started the thread - /// true if plan_->GetNext() indicated that it's done - bool done_; - /// true if Close() has been called bool closed_; @@ -190,14 +189,20 @@ class PlanFragmentExecutor { /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to /// the dtor of 'runtime_state_'. boost::scoped_ptr<RuntimeState> runtime_state_; - /// Output sink for rows sent to this fragment. May not be set, in which case rows are - /// returned via GetNext's row batch - /// Created in Prepare (if required), owned by this object. + + /// Profile for timings for each stage of the plan fragment instance's lifecycle. + RuntimeProfile* timings_profile_; + + /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this + /// object. boost::scoped_ptr<DataSink> sink_; boost::scoped_ptr<MemTracker> sink_mem_tracker_; + /// Set if this fragment instance is the root of the entire plan, so that a consumer can + /// pull results by calling root_sink_->GetNext(). Same object as sink_. + PlanRootSink* root_sink_ = nullptr; + boost::scoped_ptr<RowBatch> row_batch_; - boost::scoped_ptr<TRowBatch> thrift_batch_; /// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between /// Prepare() and Cancel() to ensure mutual exclusion. @@ -207,6 +212,12 @@ class PlanFragmentExecutor { /// error. If Cancel() was called before Prepare(), is_prepared_ will not be set. bool is_prepared_; + /// Set when Prepare() returns. + Promise<Status> prepared_promise_; + + /// Set when OpenInternal() returns. + Promise<Status> opened_promise_; + /// True if and only if Cancel() has been called. bool is_cancelled_; @@ -267,21 +278,25 @@ class PlanFragmentExecutor { void FragmentComplete(); /// Optimizes the code-generated functions in runtime_state_->llvm_codegen(). - /// Must be called between plan_->Prepare() and plan_->Open(). - /// This is somewhat time consuming so we don't want it to do it in - /// PlanFragmentExecutor()::Prepare() to allow starting plan fragments more - /// quickly and in parallel (in a deep plan tree, the fragments are started - /// in level order). + /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open(). void OptimizeLlvmModule(); /// Executes Open() logic and returns resulting status. Does not set status_. - /// If this plan fragment has no sink, OpenInternal() does nothing. - /// If this plan fragment has a sink and OpenInternal() returns without an - /// error condition, all rows will have been sent to the sink, the sink will - /// have been closed, a final report will have been sent and the report thread will - /// have been stopped. sink_ will be set to NULL after successful execution. Status OpenInternal(); + /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns + /// OK if the input was exhausted and sent to the sink successfully, an error otherwise. + /// If ExecInternal() returns without an error condition, all rows will have been sent + /// to the sink, the sink will have been closed, a final report will have been sent and + /// the report thread will have been stopped. + Status ExecInternal(); + + /// Performs all the logic of Prepare() and returns resulting status. + Status PrepareInternal(const TExecPlanFragmentParams& request); + + /// Releases the thread token for this fragment executor. + void ReleaseThreadToken(); + /// Stops report thread, if one is running. Blocks until report thread terminates. /// Idempotent. void StopReportThread(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index 5ad84df..1eb36e3 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -198,36 +198,28 @@ const TPlanFragment& FInstanceExecParams::fragment() const { } int QuerySchedule::GetNumFragmentInstances() const { - if (mt_fragment_exec_params_.empty()) return num_fragment_instances_; int result = 0; - for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) { - result += fragment_exec_params.instance_exec_params.size(); + if (mt_fragment_exec_params_.empty()) { + DCHECK(!fragment_exec_params_.empty()); + for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) { + result += fragment_exec_params.hosts.size(); + } + } else { + for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) { + result += fragment_exec_params.instance_exec_params.size(); + } } return result; } -int QuerySchedule::GetNumRemoteFInstances() const { - bool has_coordinator_fragment = GetCoordFragment() != nullptr; - int result = GetNumFragmentInstances(); - bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0; - if (is_mt_execution && has_coordinator_fragment) --result; - return result; -} - -int QuerySchedule::GetTotalFInstances() const { - int result = GetNumRemoteFInstances(); - return GetCoordFragment() != nullptr ? result + 1 : result; -} - const TPlanFragment* QuerySchedule::GetCoordFragment() const { + // Only have coordinator fragment for statements that return rows. + if (request_.stmt_type != TStmtType::QUERY) return nullptr; bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0; const TPlanFragment* fragment = is_mt_exec ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0]; - if (fragment->partition.type == TPartitionType::UNPARTITIONED) { + return fragment; - } else { - return nullptr; - } } void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 39ce268..77c9cd6 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -140,34 +140,9 @@ class QuerySchedule { /// Helper methods used by scheduler to populate this QuerySchedule. void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; } - /// The following 4 functions need to be replaced once we stop special-casing - /// the coordinator instance in the coordinator. - /// The replacement is a single function int GetNumFInstances() (which includes - /// the coordinator instance). - - /// TODO-MT: remove; this is actually only the number of remote instances - /// (from the coordinator's perspective) - void set_num_fragment_instances(int64_t num_fragment_instances) { - num_fragment_instances_ = num_fragment_instances; - } - - /// Returns the number of fragment instances registered with this schedule. - /// MT: total number of fragment instances - /// ST: value set with set_num_fragment_instances(); excludes coord instance - /// (in effect the number of remote instances) - /// TODO-MT: get rid of special-casing of coordinator instance and always return the - /// total + /// Returns the total number of fragment instances. int GetNumFragmentInstances() const; - /// Returns the total number of fragment instances, incl. coordinator fragment. - /// TODO-MT: remove - int GetTotalFInstances() const; - - /// Returns the number of remote fragment instances (excludes coordinator). - /// Works for both MT and ST. - /// TODO-MT: remove - int GetNumRemoteFInstances() const; - /// Return the coordinator fragment, or nullptr if there isn't one. const TPlanFragment* GetCoordFragment() const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/simple-scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc index 5b6303e..9b52d5a 100644 --- a/be/src/scheduling/simple-scheduler.cc +++ b/be/src/scheduling/simple-scheduler.cc @@ -663,11 +663,6 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re CreateInstanceId(schedule->query_id(), num_fragment_instances)); } } - if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) { - // the root fragment is executed directly by the coordinator - --num_fragment_instances; - } - schedule->set_num_fragment_instances(num_fragment_instances); // compute destinations and # senders per exchange node // (the root fragment doesn't have a destination) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc index 76e11d1..cc56c19 100644 --- a/be/src/service/fragment-exec-state.cc +++ b/be/src/service/fragment-exec-state.cc @@ -54,8 +54,10 @@ Status FragmentMgr::FragmentExecState::Prepare() { } void FragmentMgr::FragmentExecState::Exec() { - // Open() does the full execution, because all plan fragments have sinks - executor_.Open(); + if (Prepare().ok()) { + executor_.Open(); + executor_.Exec(); + } executor_.Close(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h index 6cff7ce..c795cd8 100644 --- a/be/src/service/fragment-exec-state.h +++ b/be/src/service/fragment-exec-state.h @@ -47,9 +47,6 @@ class FragmentMgr::FragmentExecState { /// the fragment and returns OK. Status Cancel(); - /// Call Prepare() and create and initialize data sink. - Status Prepare(); - /// Main loop of plan fragment execution. Blocks until execution finishes. void Exec(); @@ -67,6 +64,8 @@ class FragmentMgr::FragmentExecState { /// Publishes filter with ID 'filter_id' to this fragment's filter bank. void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter); + PlanFragmentExecutor* executor() { return &executor_; } + private: TQueryCtx query_ctx_; TPlanFragmentInstanceCtx fragment_instance_ctx_; @@ -98,6 +97,9 @@ class FragmentMgr::FragmentExecState { /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created /// for this fragment (e.g. when the fragment has failed during preparation). void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done); + + /// Call Prepare() and create and initialize data sink. + Status Prepare(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc index 64e9a78..8e8fc05 100644 --- a/be/src/service/fragment-mgr.cc +++ b/be/src/service/fragment-mgr.cc @@ -54,9 +54,6 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) return process_mem_tracker->MemLimitExceeded(NULL, msg, 0); } - // Remote fragments must always have a sink. Remove when IMPALA-2905 is resolved. - DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink); - shared_ptr<FragmentExecState> exec_state( new FragmentExecState(exec_params, ExecEnv::GetInstance())); @@ -64,6 +61,8 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) // only happen after this RPC returns) can always find this fragment. { lock_guard<SpinLock> l(fragment_exec_state_map_lock_); + DCHECK(fragment_exec_state_map_.find(exec_state->fragment_instance_id()) + == fragment_exec_state_map_.end()); fragment_exec_state_map_.insert( make_pair(exec_state->fragment_instance_id(), exec_state)); } @@ -84,8 +83,7 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) void FragmentMgr::FragmentThread(TUniqueId fragment_instance_id) { shared_ptr<FragmentExecState> exec_state = GetFragmentExecState(fragment_instance_id); if (exec_state.get() == NULL) return; - Status status = exec_state->Prepare(); - if (status.ok()) exec_state->Exec(); + exec_state->Exec(); // We're done with this plan fragment { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 3daa36b..ee7f958 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -17,55 +17,19 @@ #include "service/impala-server.h" -#include <algorithm> #include <boost/algorithm/string/join.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp> -#include <boost/unordered_set.hpp> -#include <jni.h> -#include <thrift/protocol/TDebugProtocol.h> -#include <gtest/gtest.h> -#include <boost/bind.hpp> -#include <boost/algorithm/string.hpp> -#include <gperftools/heap-profiler.h> -#include <gperftools/malloc_extension.h> - -#include "codegen/llvm-codegen.h" + #include "common/logging.h" -#include "common/version.h" -#include "exec/exec-node.h" -#include "exec/hdfs-table-sink.h" -#include "exec/scan-node.h" -#include "exprs/expr.h" -#include "runtime/data-stream-mgr.h" -#include "runtime/client-cache.h" -#include "runtime/descriptors.h" -#include "runtime/data-stream-sender.h" -#include "runtime/row-batch.h" -#include "runtime/plan-fragment-executor.h" -#include "runtime/hdfs-fs-cache.h" +#include "gen-cpp/Frontend_types.h" +#include "rpc/thrift-util.h" #include "runtime/exec-env.h" -#include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" #include "runtime/timestamp-value.h" -#include "scheduling/simple-scheduler.h" #include "service/query-exec-state.h" #include "service/query-options.h" -#include "util/container-util.h" -#include "util/debug-util.h" +#include "service/query-result-set.h" #include "util/impalad-metrics.h" -#include "util/string-parser.h" -#include "rpc/thrift-util.h" -#include "rpc/thrift-server.h" -#include "util/jni-util.h" #include "util/webserver.h" -#include "gen-cpp/Types_types.h" -#include "gen-cpp/ImpalaService.h" -#include "gen-cpp/DataSinks_types.h" -#include "gen-cpp/Types_types.h" -#include "gen-cpp/ImpalaService.h" -#include "gen-cpp/ImpalaService_types.h" -#include "gen-cpp/ImpalaInternalService.h" -#include "gen-cpp/Frontend_types.h" #include "common/names.h" @@ -83,11 +47,17 @@ using namespace beeswax; } \ } while (false) +namespace { + +/// Ascii output precision for double/float +constexpr int ASCII_PRECISION = 16; +} + namespace impala { // Ascii result set for Beeswax. // Beeswax returns rows in ascii, using "\t" as column delimiter. -class ImpalaServer::AsciiQueryResultSet : public ImpalaServer::QueryResultSet { +class AsciiQueryResultSet : public QueryResultSet { public: // Rows are added into rowset. AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index ee79b4b..de0e2f3 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -36,13 +36,14 @@ #include "exprs/expr.h" #include "rpc/thrift-util.h" #include "runtime/raw-value.h" +#include "service/hs2-util.h" #include "service/query-exec-state.h" #include "service/query-options.h" +#include "service/query-result-set.h" #include "util/debug-util.h" -#include "util/runtime-profile-counters.h" #include "util/impalad-metrics.h" +#include "util/runtime-profile-counters.h" #include "util/string-parser.h" -#include "service/hs2-util.h" #include "common/names.h" @@ -129,7 +130,7 @@ static TOperationState::type QueryStateToTOperationState( // Result set container for Hive protocol versions >= V6, where results are returned in // column-orientation. -class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet { +class HS2ColumnarResultSet : public QueryResultSet { public: HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL) : metadata_(metadata), result_set_(rowset), num_rows_(0) { @@ -317,7 +318,7 @@ class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet { }; // TRow result set for HiveServer2 -class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSet { +class HS2RowOrientedResultSet : public QueryResultSet { public: // Rows are added into rowset. HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL) @@ -393,16 +394,6 @@ class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSe scoped_ptr<TRowSet> owned_result_set_; }; -ImpalaServer::QueryResultSet* ImpalaServer::CreateHS2ResultSet( - TProtocolVersion::type version, const TResultSetMetadata& metadata, - TRowSet* rowset) { - if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) { - return new HS2RowOrientedResultSet(metadata, rowset); - } else { - return new HS2ColumnarResultSet(metadata, rowset); - } -} - void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) { TUniqueId session_id; @@ -482,6 +473,18 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); } +namespace { + +QueryResultSet* CreateHS2ResultSet( + TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) { + if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) { + return new HS2RowOrientedResultSet(metadata, rowset); + } else { + return new HS2ColumnarResultSet(metadata, rowset); + } +} +} + Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first, TFetchResultsResp* fetch_results) { shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); @@ -759,8 +762,9 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, // Optionally enable result caching on the QueryExecState. if (cache_num_rows > 0) { - status = exec_state->SetResultCache(CreateHS2ResultSet(session->hs2_version, - *exec_state->result_metadata()), cache_num_rows); + status = exec_state->SetResultCache( + CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr), + cache_num_rows); if (!status.ok()) { UnregisterQuery(exec_state->query_id(), false, &status); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-internal-service.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h index a238f65..af54c35 100644 --- a/be/src/service/impala-internal-service.h +++ b/be/src/service/impala-internal-service.h @@ -18,8 +18,6 @@ #ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H #define IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H -#include <boost/shared_ptr.hpp> - #include "gen-cpp/ImpalaInternalService.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "service/impala-server.h" @@ -32,9 +30,12 @@ namespace impala { /// ImpalaInternalService service. class ImpalaInternalService : public ImpalaInternalServiceIf { public: - ImpalaInternalService(const boost::shared_ptr<ImpalaServer>& impala_server, - const boost::shared_ptr<FragmentMgr>& fragment_mgr) - : impala_server_(impala_server), fragment_mgr_(fragment_mgr) { } + ImpalaInternalService() { + impala_server_ = ExecEnv::GetInstance()->impala_server(); + DCHECK(impala_server_ != nullptr); + fragment_mgr_ = ExecEnv::GetInstance()->fragment_mgr(); + DCHECK(fragment_mgr_ != nullptr); + } virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) { @@ -74,10 +75,10 @@ class ImpalaInternalService : public ImpalaInternalServiceIf { private: /// Manages fragment reporting and data transmission - boost::shared_ptr<ImpalaServer> impala_server_; + ImpalaServer* impala_server_; /// Manages fragment execution - boost::shared_ptr<FragmentMgr> fragment_mgr_; + FragmentMgr* fragment_mgr_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 7f9d862..bf83eec 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -206,7 +206,6 @@ const string HS2_SERVER_NAME = "hiveserver2-frontend"; const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000"; const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000"; const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00"; -const int ImpalaServer::ASCII_PRECISION = 16; // print 16 digits for double/float const int MAX_NM_MISSED_HEARTBEATS = 5; @@ -1866,9 +1865,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int } if (be_port != 0 && be_server != NULL) { - boost::shared_ptr<FragmentMgr> fragment_mgr(new FragmentMgr()); - boost::shared_ptr<ImpalaInternalService> thrift_if( - new ImpalaInternalService(handler, fragment_mgr)); + boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService()); boost::shared_ptr<TProcessor> be_processor( new ImpalaInternalServiceProcessor(thrift_if)); boost::shared_ptr<TProcessorEventHandler> event_handler( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 2104c5e..53f3384 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -249,45 +249,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, boost::scoped_ptr<ImpalaHttpHandler> http_handler_; - /// Query result set stores converted rows returned by QueryExecState.fetchRows(). It - /// provides an interface to convert Impala rows to external API rows. - /// It is an abstract class. Subclass must implement AddOneRow(). - class QueryResultSet { - public: - QueryResultSet() {} - virtual ~QueryResultSet() {} - - /// Add the row (list of expr value) from a select query to this result set. When a row - /// comes from a select query, the row is in the form of expr values (void*). 'scales' - /// contains the values' scales (# of digits after decimal), with -1 indicating no - /// scale specified. - virtual Status AddOneRow( - const std::vector<void*>& row, const std::vector<int>& scales) = 0; - - /// Add the TResultRow to this result set. When a row comes from a DDL/metadata - /// operation, the row in the form of TResultRow. - virtual Status AddOneRow(const TResultRow& row) = 0; - - /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result - /// set into this result set. Returns the number of rows added to this result set. - /// Returns 0 if the given range is out of bounds of the other result set. - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0; - - /// Returns the approximate size of this result set in bytes. - int64_t ByteSize() { return ByteSize(0, size()); } - - /// Returns the approximate size of the given range of rows in bytes. - virtual int64_t ByteSize(int start_idx, int num_rows) = 0; - - /// Returns the size of this result set in number of rows. - virtual size_t size() = 0; - }; - - /// Result set implementations for Beeswax and HS2 - class AsciiQueryResultSet; - class HS2RowOrientedResultSet; - class HS2ColumnarResultSet; - struct SessionState; /// Execution state of a query. @@ -299,14 +260,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, static const char* SQLSTATE_GENERAL_ERROR; static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED; - /// Ascii output precision for double/float - static const int ASCII_PRECISION; - - QueryResultSet* CreateHS2ResultSet( - apache::hive::service::cli::thrift::TProtocolVersion::type version, - const TResultSetMetadata& metadata, - apache::hive::service::cli::thrift::TRowSet* rowset = NULL); - /// Return exec state for given query_id, or NULL if not found. /// If 'lock' is true, the returned exec state's lock() will be acquired before /// the query_exec_state_map_lock_ is released. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index d55ac54..1532ecf 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -19,14 +19,15 @@ #include <limits> #include <gutil/strings/substitute.h> -#include "exprs/expr.h" #include "exprs/expr-context.h" +#include "exprs/expr.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" -#include "service/impala-server.h" #include "service/frontend.h" +#include "service/impala-server.h" #include "service/query-options.h" +#include "service/query-result-set.h" #include "util/debug-util.h" #include "util/impalad-metrics.h" #include "util/runtime-profile-counters.h" @@ -191,6 +192,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) { exec_request_.set_query_option_request.value, &session_->default_query_options, &session_->set_query_options_mask)); + SetResultSet({}, {}); } else { // "SET" returns a table of all query options. map<string, string> config; @@ -421,17 +423,10 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); } - // If desc_tbl is not set, query has SELECT with no FROM. In that - // case, the query can only have a single fragment, and that fragment needs to be - // executed by the coordinator. This check confirms that. - // If desc_tbl is set, the query may or may not have a coordinator fragment. bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0; const TPlanFragment& fragment = is_mt_exec ? query_exec_request.mt_plan_exec_info[0].fragments[0] : query_exec_request.fragments[0]; - bool has_coordinator_fragment = - fragment.partition.type == TPartitionType::UNPARTITIONED; - DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl); { lock_guard<mutex> l(lock_); @@ -449,7 +444,7 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( } coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_)); - status = coord_->Exec(&output_expr_ctxs_); + status = coord_->Exec(); { lock_guard<mutex> l(lock_); RETURN_IF_ERROR(UpdateQueryStatus(status)); @@ -538,12 +533,11 @@ void ImpalaServer::QueryExecState::Done() { query_events_->MarkEvent("Unregister query"); if (coord_.get() != NULL) { - Expr::Close(output_expr_ctxs_, coord_->runtime_state()); // Release any reserved resources. Status status = exec_env_->scheduler()->Release(schedule_.get()); if (!status.ok()) { LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id() - << " because of error: " << status.GetDetail(); + << " because of error: " << status.GetDetail(); } coord_->TearDown(); } @@ -626,7 +620,6 @@ Status ImpalaServer::QueryExecState::WaitInternal() { if (coord_.get() != NULL) { RETURN_IF_ERROR(coord_->Wait()); - RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, coord_->runtime_state())); RETURN_IF_ERROR(UpdateCatalog()); } @@ -719,6 +712,10 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, return Status::OK(); } + if (coord_.get() == nullptr) { + return Status("Client tried to fetch rows on a query that produces no results."); + } + int32_t num_rows_fetched_from_cache = 0; if (result_cache_max_size_ > 0 && result_cache_ != NULL) { // Satisfy the fetch from the result cache if possible. @@ -729,27 +726,7 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, if (num_rows_fetched_from_cache >= max_rows) return Status::OK(); } - // List of expr values to hold evaluated rows from the query - vector<void*> result_row; - result_row.resize(output_expr_ctxs_.size()); - - // List of scales for floating point values in result_row - vector<int> scales; - scales.resize(result_row.size()); - - if (coord_ == NULL) { - // Query with LIMIT 0. - query_state_ = QueryState::FINISHED; - eos_ = true; - return Status::OK(); - } - query_state_ = QueryState::FINISHED; // results will be ready after this call - // Fetch the next batch if we've returned the current batch entirely - if (current_batch_ == NULL || current_batch_row_ >= current_batch_->num_rows()) { - RETURN_IF_ERROR(FetchNextBatch()); - } - if (current_batch_ == NULL) return Status::OK(); // Maximum number of rows to be fetched from the coord. int32_t max_coord_rows = max_rows; @@ -759,22 +736,26 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, } { SCOPED_TIMER(row_materialization_timer_); - // Convert the available rows, limited by max_coord_rows - int available = current_batch_->num_rows() - current_batch_row_; - int fetched_count = available; - // max_coord_rows <= 0 means no limit - if (max_coord_rows > 0 && max_coord_rows < available) fetched_count = max_coord_rows; - for (int i = 0; i < fetched_count; ++i) { - TupleRow* row = current_batch_->GetRow(current_batch_row_); - RETURN_IF_ERROR(GetRowValue(row, &result_row, &scales)); - RETURN_IF_ERROR(fetched_rows->AddOneRow(result_row, scales)); - ++num_rows_fetched_; - ++current_batch_row_; + size_t before = fetched_rows->size(); + // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ + // (already held) ensures that we do not call coord_->GetNext() multiple times + // concurrently. + // TODO: Simplify this. + lock_.unlock(); + Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_); + lock_.lock(); + int num_fetched = fetched_rows->size() - before; + DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute( + "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows); + num_rows_fetched_ += num_fetched; + + RETURN_IF_ERROR(status); + // Check if query status has changed during GetNext() call + if (!query_status_.ok()) { + eos_ = true; + return query_status_; } } - ExprContext::FreeLocalAllocations(output_expr_ctxs_); - // Check if there was an error evaluating a row value. - RETURN_IF_ERROR(coord_->runtime_state()->CheckQueryState()); // Update the result cache if necessary. if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) { @@ -833,16 +814,6 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows, return Status::OK(); } -Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector<void*>* result, - vector<int>* scales) { - DCHECK(result->size() >= output_expr_ctxs_.size()); - for (int i = 0; i < output_expr_ctxs_.size(); ++i) { - (*result)[i] = output_expr_ctxs_[i]->GetValue(row); - (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale(); - } - return Status::OK(); -} - Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* cause) { Coordinator* coord; { @@ -931,28 +902,6 @@ Status ImpalaServer::QueryExecState::UpdateCatalog() { return Status::OK(); } -Status ImpalaServer::QueryExecState::FetchNextBatch() { - DCHECK(!eos_); - DCHECK(coord_.get() != NULL); - - // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_ - // ensures that we do not call coord_->GetNext() multiple times concurrently. - lock_.unlock(); - Status status = coord_->GetNext(¤t_batch_, coord_->runtime_state()); - lock_.lock(); - if (!status.ok()) return status; - - // Check if query status has changed during GetNext() call - if (!query_status_.ok()) { - current_batch_ = NULL; - return query_status_; - } - - current_batch_row_ = 0; - eos_ = current_batch_ == NULL; - return Status::OK(); -} - void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) { request_result_set_.reset(new vector<TResultRow>); request_result_set_->resize(results.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h index 0a763ff..54ee929 100644 --- a/be/src/service/query-exec-state.h +++ b/be/src/service/query-exec-state.h @@ -248,7 +248,7 @@ class ImpalaServer::QueryExecState { /// Resource assignment determined by scheduler. Owned by obj_pool_. boost::scoped_ptr<QuerySchedule> schedule_; - /// not set for ddl queries, or queries with "limit 0" + /// Not set for ddl queries. boost::scoped_ptr<Coordinator> coord_; /// Runs statements that query or modify the catalog via the CatalogService. @@ -293,7 +293,7 @@ class ImpalaServer::QueryExecState { MonotonicStopWatch client_wait_sw_; RuntimeProfile::EventSequence* query_events_; - std::vector<ExprContext*> output_expr_ctxs_; + bool is_cancelled_; // if true, Cancel() was called. bool eos_; // if true, there are no more rows to return // We enforce the invariant that query_status_ is not OK iff query_state_ @@ -356,13 +356,6 @@ class ImpalaServer::QueryExecState { /// Caller needs to hold fetch_rows_lock_ and lock_. Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows); - /// Fetch the next row batch and store the results in current_batch_. Only called for - /// non-DDL / DML queries. current_batch_ is set to NULL if execution is complete or the - /// query was cancelled. - /// Caller needs to hold fetch_rows_lock_ and lock_. Blocks, during which time lock_ is - /// released. - Status FetchNextBatch(); - /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'. /// result and scales must have been resized to the number of columns before call. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-result-set.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h new file mode 100644 index 0000000..b444ca3 --- /dev/null +++ b/be/src/service/query-result-set.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_SERVICE_QUERY_RESULT_SET_H +#define IMPALA_SERVICE_QUERY_RESULT_SET_H + +#include "common/status.h" +#include "gen-cpp/Data_types.h" + +#include <vector> + +namespace impala { + +/// Stores client-ready query result rows returned by +/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to +/// specialise how Impala's row batches are converted to client-API result +/// representations. +class QueryResultSet { + public: + QueryResultSet() {} + virtual ~QueryResultSet() {} + + /// Add a single row to this result set. The row is a vector of pointers to values, + /// whose memory belongs to the caller. 'scales' contains the scales for decimal values + /// (# of digits after decimal), with -1 indicating no scale specified or the + /// corresponding value is not a decimal. + virtual Status AddOneRow( + const std::vector<void*>& row, const std::vector<int>& scales) = 0; + + /// Add the TResultRow to this result set. When a row comes from a DDL/metadata + /// operation, the row in the form of TResultRow. + virtual Status AddOneRow(const TResultRow& row) = 0; + + /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result + /// set into this result set. Returns the number of rows added to this result set. + /// Returns 0 if the given range is out of bounds of the other result set. + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0; + + /// Returns the approximate size of this result set in bytes. + int64_t ByteSize() { return ByteSize(0, size()); } + + /// Returns the approximate size of the given range of rows in bytes. + virtual int64_t ByteSize(int start_idx, int num_rows) = 0; + + /// Returns the size of this result set in number of rows. + virtual size_t size() = 0; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/testutil/in-process-servers.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc index cf0e28e..b28f7fc 100644 --- a/be/src/testutil/in-process-servers.cc +++ b/be/src/testutil/in-process-servers.cc @@ -34,6 +34,7 @@ DECLARE_string(ssl_server_certificate); DECLARE_string(ssl_private_key); +DECLARE_int32(be_port); using namespace apache::thrift; using namespace impala; @@ -43,6 +44,9 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts( for (int tries = 0; tries < 10; ++tries) { int backend_port = FindUnusedEphemeralPort(); if (backend_port == -1) continue; + // This flag is read directly in several places to find the address of the local + // backend interface. + FLAGS_be_port = backend_port; int subscriber_port = FindUnusedEphemeralPort(); if (subscriber_port == -1) continue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/common/thrift/DataSinks.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 12a75b9..83c63b7 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -26,7 +26,8 @@ include "Partitions.thrift" enum TDataSinkType { DATA_STREAM_SINK, TABLE_SINK, - JOIN_BUILD_SINK + JOIN_BUILD_SINK, + PLAN_ROOT_SINK } enum TSinkAction { @@ -87,10 +88,10 @@ struct TJoinBuildSink { // Union type of all table sinks. struct TTableSink { - 1: required Types.TTableId target_table_id + 1: required Types.TTableId target_table_id 2: required TTableSinkType type 3: required TSinkAction action - 4: optional THdfsTableSink hdfs_table_sink + 4: optional THdfsTableSink hdfs_table_sink 5: optional TKuduTableSink kudu_table_sink } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java index b02bc73..392b961 100644 --- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java @@ -25,6 +25,8 @@ import java.util.Set; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.TreeNode; +import org.apache.impala.planner.DataSink; +import org.apache.impala.planner.PlanRootSink; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; @@ -409,6 +411,10 @@ public abstract class QueryStmt extends StatementBase { resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true); } + public DataSink createDataSink() { + return new PlanRootSink(); + } + public ArrayList<OrderByElement> cloneOrderByElements() { if (orderByElements_ == null) return null; ArrayList<OrderByElement> result = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java new file mode 100644 index 0000000..a199f54 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import org.apache.impala.thrift.TDataSink; +import org.apache.impala.thrift.TDataSinkType; +import org.apache.impala.thrift.TExplainLevel; + +/** + * Sink for the root of a query plan that produces result rows. Allows coordination + * between the sender which produces those rows, and the consumer which sends them to the + * client, despite both executing concurrently. + */ +public class PlanRootSink extends DataSink { + + public String getExplainString(String prefix, String detailPrefix, + TExplainLevel explainLevel) { + return String.format("%sPLAN-ROOT SINK\n", prefix); + } + + protected TDataSink toThrift() { + return new TDataSink(TDataSinkType.PLAN_ROOT_SINK); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 405eebe..ed4c677 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -149,6 +149,8 @@ public class Planner { } else if (ctx_.isDelete()) { // Set up delete sink for root fragment rootFragment.setSink(ctx_.getAnalysisResult().getDeleteStmt().createDataSink()); + } else if (ctx_.isQuery()) { + rootFragment.setSink(ctx_.getAnalysisResult().getQueryStmt().createDataSink()); } QueryStmt queryStmt = ctx_.getQueryStmt(); queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlannerContext.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java index 29cca13..3275a7a 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java +++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java @@ -90,6 +90,7 @@ public class PlannerContext { public boolean isInsertOrCtas() { return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt(); } + public boolean isQuery() { return analysisResult_.isQueryStmt(); } public boolean hasSubplan() { return !subplans_.isEmpty(); } public SubplanNode getSubplan() { return subplans_.getFirst(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test index 47bfb23..d7838f9 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test @@ -3,12 +3,16 @@ select count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tin avg(tinyint_col) from functional.alltypesagg ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) | 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count:merge(*), count:merge(tinyint_col), min:merge(tinyint_col), max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col) | @@ -26,6 +30,8 @@ avg(tinyint_col) from functional.alltypesagg group by 2, 1 ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col) | group by: bigint_col, tinyint_col @@ -33,6 +39,8 @@ group by 2, 1 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 04:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE [FINALIZE] @@ -54,6 +62,8 @@ from functional.testtbl having count(id) > 0 order by avg(zip) limit 10 ---- PLAN +PLAN-ROOT SINK +| 02:TOP-N [LIMIT=10] | order by: avg(zip) ASC | @@ -64,6 +74,8 @@ order by avg(zip) limit 10 00:SCAN HDFS [functional.testtbl] partitions=1/1 files=0 size=0B ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 02:TOP-N [LIMIT=10] | order by: avg(zip) ASC | @@ -85,6 +97,8 @@ from functional.alltypesagg group by int_col + int_col, int_col * int_col, int_col + int_col having (int_col * int_col) < 0 limit 10 ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | group by: int_col + int_col, int_col * int_col | having: int_col * int_col < 0 @@ -93,6 +107,8 @@ having (int_col * int_col) < 0 limit 10 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 04:EXCHANGE [UNPARTITIONED] | limit: 10 | @@ -116,6 +132,8 @@ functional.alltypes t1 inner join functional.alltypestiny t2 group by t1.tinyint_col, t2.smallint_col having count(t2.int_col) = count(t1.bigint_col) ---- PLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count(*), count(t2.int_col), count(t1.bigint_col) | group by: t1.tinyint_col, t2.smallint_col @@ -141,6 +159,8 @@ select 1 from group by int_col) t where t.x > 10 ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: avg(bigint_col) | group by: int_col @@ -157,6 +177,8 @@ select count(*) from select * from functional.alltypessmall) t limit 10 ---- PLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count(*) | limit: 10 @@ -169,6 +191,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 05:AGGREGATE [FINALIZE] | output: count:merge(*) | limit: 10 @@ -194,6 +218,8 @@ select count(*) from group by t.bigint_col limit 10 ---- PLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count(*) | group by: bigint_col @@ -207,6 +233,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:EXCHANGE [UNPARTITIONED] | limit: 10 | @@ -237,6 +265,8 @@ from select * from functional.alltypessmall) t limit 10 ---- PLAN +PLAN-ROOT SINK +| 04:AGGREGATE [FINALIZE] | output: count(int_col) | limit: 10 @@ -252,6 +282,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 08:AGGREGATE [FINALIZE] | output: count:merge(int_col) | limit: 10 @@ -286,6 +318,8 @@ from group by t.bigint_col limit 10 ---- PLAN +PLAN-ROOT SINK +| 04:AGGREGATE [FINALIZE] | output: count(int_col) | group by: t.bigint_col @@ -302,6 +336,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 07:EXCHANGE [UNPARTITIONED] | limit: 10 | @@ -334,6 +370,8 @@ from select * from functional.alltypessmall) t limit 10 ---- PLAN +PLAN-ROOT SINK +| 04:AGGREGATE [FINALIZE] | output: count(int_col), count:merge(smallint_col) | limit: 10 @@ -350,6 +388,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 08:AGGREGATE [FINALIZE] | output: count:merge(int_col), count:merge(smallint_col) | limit: 10 @@ -386,6 +426,8 @@ from group by t.bigint_col limit 10 ---- PLAN +PLAN-ROOT SINK +| 04:AGGREGATE [FINALIZE] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col @@ -403,6 +445,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 07:EXCHANGE [UNPARTITIONED] | limit: 10 | @@ -438,6 +482,8 @@ from group by t.bigint_col limit 10 ---- PLAN +PLAN-ROOT SINK +| 05:AGGREGATE [FINALIZE] | output: count(int_col), count:merge(smallint_col) | group by: t.bigint_col @@ -458,6 +504,8 @@ limit 10 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 10:EXCHANGE [UNPARTITIONED] | limit: 10 | @@ -495,6 +543,8 @@ limit 10 # test that aggregations are not placed below an unpartitioned exchange with a limit select count(*) from (select * from functional.alltypes limit 10) t ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*) | @@ -502,6 +552,8 @@ select count(*) from (select * from functional.alltypes limit 10) t partitions=24/24 files=24 size=478.45KB limit: 10 ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*) | @@ -518,6 +570,8 @@ select count(*) from union all (select * from functional.alltypessmall) limit 10) t ---- PLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count(*) | @@ -530,6 +584,8 @@ select count(*) from 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: count(*) | @@ -555,6 +611,8 @@ select * from ( limit 2) v limit 1 ---- PLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: count(cnt) | limit: 1 @@ -580,6 +638,8 @@ limit 1 partitions=11/11 files=11 size=814.73KB runtime filters: RF000 -> t1.id ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: count(cnt) | limit: 1 @@ -629,6 +689,8 @@ select * from group by 1, 2, 3, 4) v where v.a = v.b and v.b = v.c and v.c = v.d and v.a = v.c and v.a = v.d ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | group by: tinyint_col, smallint_col, int_col + int_col, coalesce(bigint_col, year) | having: int_col + int_col = coalesce(bigint_col, year), smallint_col = int_col + int_col @@ -643,6 +705,8 @@ select cnt from from functional.alltypestiny group by bool_col, x) v ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*) | group by: bool_col, CAST(NULL AS INT) @@ -656,6 +720,8 @@ select cnt from from functional.alltypestiny group by bool_col, x) v ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: count(int_col) | group by: bool_col, NULL @@ -669,6 +735,8 @@ select cnt from # test simple group_concat with distinct select group_concat(distinct string_col) from functional.alltypesagg ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: group_concat(string_col) | @@ -678,6 +746,8 @@ select group_concat(distinct string_col) from functional.alltypesagg 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col) | @@ -702,6 +772,8 @@ select day, group_concat(distinct string_col) from (select * from functional.alltypesagg where id % 100 = day order by id limit 99999) a group by day ---- PLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: group_concat(string_col) | group by: day @@ -716,6 +788,8 @@ group by day partitions=11/11 files=11 size=814.73KB predicates: id % 100 = day ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 03:AGGREGATE [FINALIZE] | output: group_concat(string_col) | group by: day @@ -739,6 +813,8 @@ select count(distinct cast(timestamp_col as string)), group_concat(distinct cast(timestamp_col as string)) from functional.alltypesagg group by year ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING)) | group by: year @@ -749,6 +825,8 @@ from functional.alltypesagg group by year 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE [FINALIZE] @@ -769,6 +847,8 @@ from functional.alltypesagg group by year # test group_concat distinct with other non-distinct aggregate functions select group_concat(distinct string_col), count(*) from functional.alltypesagg ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: group_concat(string_col), count:merge(*) | @@ -779,6 +859,8 @@ from functional.alltypesagg group by year 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col), count:merge(*) | @@ -804,6 +886,8 @@ from functional.alltypesagg group by year select group_concat(distinct string_col, '-'), sum(int_col), count(distinct string_col) from functional.alltypesagg ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col) | @@ -814,6 +898,8 @@ from functional.alltypesagg 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col, '-'), count:merge(string_col), sum:merge(int_col) | @@ -841,6 +927,8 @@ select month, year, count(*), count(distinct date_string_col), group_concat(distinct date_string_col, '-') from functional.alltypesagg group by month, year ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*) | group by: month, year @@ -852,6 +940,8 @@ group by month, year 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE [FINALIZE] @@ -875,6 +965,8 @@ group by month, year select group_concat(distinct string_col), group_concat(distinct string_col, '-'), group_concat(distinct string_col, '---') from functional.alltypesagg ---- PLAN +PLAN-ROOT SINK +| 02:AGGREGATE [FINALIZE] | output: group_concat(string_col), group_concat(string_col, '-'), group_concat(string_col, '---') | @@ -884,6 +976,8 @@ group_concat(distinct string_col, '---') from functional.alltypesagg 00:SCAN HDFS [functional.alltypesagg] partitions=11/11 files=11 size=814.73KB ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:AGGREGATE [FINALIZE] | output: group_concat:merge(string_col), group_concat:merge(string_col, '-'), group_concat:merge(string_col, '---') | @@ -906,6 +1000,8 @@ group_concat(distinct string_col, '---') from functional.alltypesagg # IMPALA-852: Aggregation only in the HAVING clause. select 1 from functional.alltypestiny having count(*) > 0 ---- PLAN +PLAN-ROOT SINK +| 01:AGGREGATE [FINALIZE] | output: count(*) | having: count(*) > 0 @@ -923,6 +1019,8 @@ group by 1 having count(*) < 150000 limit 1000000 ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 06:EXCHANGE [UNPARTITIONED] | limit: 1000000 | @@ -957,6 +1055,8 @@ select col from ( where col > 50 limit 50 ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 08:AGGREGATE [FINALIZE] | output: count:merge(c_custkey) | having: count(c_custkey) > 50 @@ -992,6 +1092,8 @@ select straight_join c_custkey, count(distinct c_custkey) from tpch_parquet.orders inner join [shuffle] tpch_parquet.customer on c_custkey = o_custkey group by 1 ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 07:EXCHANGE [UNPARTITIONED] | 04:AGGREGATE [FINALIZE] @@ -1029,6 +1131,8 @@ group by 1, 2 having count(*) > 10 limit 10 ---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| 09:EXCHANGE [UNPARTITIONED] | limit: 10 |