IMPALA-6792: Fail status reporting if coordinator refuses connections The ReportExecStatusAux() function is run on a dedicated thread per fragment instance. This thread will run until the fragment instance completes executing.
On every attempt to send a report to the coordinator, it will attempt to send up to 3 RPCs. If all 3 of them fail, then the fragment instance will cancel itself. However, there is one case where a failure to send the RPC will not be considered a failed RPC. If when we attempt to obtain a new connection, we end up creating a new connection (via ClientCache::CreateClient()) instead of getting a previously cached connection, and this new connection fails to even Open(), it will not be counted as a RPC failure. This patch counts such an error as a failed RPC too. This patch also clarifies some of the error log messages and introduces a flag to control the sleep interval between failed ReportExecStatus RPC retries. Change-Id: If668838f99f78b5ffa713488178b2eb5799ba220 Reviewed-on: http://gerrit.cloudera.org:8080/9916 Reviewed-by: Sailesh Mukil <sail...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4d6b07f0 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4d6b07f0 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4d6b07f0 Branch: refs/heads/master Commit: 4d6b07f0e2d18c8f5284d27b82bcee3aa9f1fe77 Parents: c9f4fdc Author: Sailesh Mukil <sail...@cloudera.com> Authored: Tue Apr 3 14:24:21 2018 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Thu Apr 5 02:17:10 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/query-state.cc | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/4d6b07f0/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index ad5748f..04a4283 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -38,6 +38,10 @@ #include "common/names.h" +DEFINE_int32(report_status_retry_interval_ms, 100, + "The interval in milliseconds to wait before retrying a failed status report RPC to " + "the coordinator."); + using namespace impala; QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) { @@ -249,28 +253,40 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status, DCHECK_EQ(res.status.status_code, TErrorCode::OK); // Try to send the RPC 3 times before failing. Sleep for 100ms between retries. // It's safe to retry the RPC as the coordinator handles duplicate RPC messages. + Status client_status; for (int i = 0; i < 3; ++i) { - Status client_status; ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(), query_ctx().coord_address, &client_status); if (client_status.ok()) { rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res); if (rpc_status.ok()) break; } - if (i < 2) SleepForMs(100); + if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms); } Status result_status(res.status); - if ((!rpc_status.ok() || !result_status.ok()) && instances_started) { + if ((!client_status.ok() || !rpc_status.ok() || !result_status.ok()) && + instances_started) { // TODO: should we try to keep rpc_status for the final report? (but the final // report, following this Cancel(), may not succeed anyway.) // TODO: not keeping an error status here means that all instances might // abort with CANCELLED status, despite there being an error - if (!rpc_status.ok()) { - // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to - // hang as the coordinator may not be aware of the cancellation. Remove the log - // statement once IMPALA-2990 is fixed. - LOG(ERROR) << "Cancelling fragment instances due to failure to report status. " - << "Query " << PrintId(query_id()) << " may hang. See IMPALA-2990."; + // TODO: Fix IMPALA-2990. Cancelling fragment instances without sending the + // ReporExecStatus RPC may cause query to hang as the coordinator may not be aware + // of the cancellation. Remove the log statements once IMPALA-2990 is fixed. + if (!client_status.ok()) { + LOG(ERROR) << "Cancelling fragment instances due to failure to obtain a connection " + << "to the coordinator. (" << client_status.GetDetail() + << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990."; + } else if (!rpc_status.ok()) { + LOG(ERROR) << "Cancelling fragment instances due to failure to reach the " + << "coordinator. (" << rpc_status.GetDetail() + << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990."; + } else if (!result_status.ok()) { + // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get + // back a non-OK status, it means that the coordinator expects us to cancel the + // fragment instances for this query. + LOG(INFO) << "Cancelling fragment instances as directed by the coordinator. " + << "Returned status: " << result_status.GetDetail(); } Cancel(); }