Joe McDonnell has posted comments on this change. ( http://gerrit.cloudera.org:8080/17144 )
Change subject: IMPALA-10551: Add result sink support for external frontends ...................................................................... Patch Set 3: (7 comments) My general thought is that a query with a result sink is a lot more like a DML than a query, so it would be useful to reused that codepath with some minor changes. These comments are a bit disorganized, so let me know if the idea isn't really coming through. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.h@308 PS3, Line 308: /// Non-null if and only if the query produces results for the client; i.e. is of : /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree and return : /// them to the client in GetNext(), and also to access the fragment instance's runtime : /// state. : /// : /// Result rows are materialized by this fragment instance in its own thread. They are : /// materialized into a QueryResultSet provided to the coordinator during GetNext(). : /// : /// Owned by the QueryState. Set in Exec(). : FragmentInstanceState* coord_instance_ = nullptr; The way we are using coord_instance_ for a query with a result sink doesn't match this comment. I'm thinking coord_instance_ should probably remain null when doing a QUERY with a result sink. The client wouldn't be getting rows out of GetNext(). Having coord_instance_ == nullptr doesn't mean there is no fragment on the coordinator. It just means that we don't really use it in coordinator.cc. We currently only use coord_instance_ as a place to store whether this has a result sink. Maybe we could pass that fact in separately. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@193 PS3, Line 193: // set coord_instance_ and coord_sink_ : if (exec_params_.GetCoordFragment() != nullptr) { : // this blocks until all fragment instances have finished their Prepare phase : Status query_status = query_state_->GetFInstanceState(query_id(), &coord_instance_); : if (!query_status.ok()) return UpdateExecState(query_status, nullptr, FLAGS_hostname); : // We expected this query to have a coordinator instance. : DCHECK(coord_instance_ != nullptr); : // When writing results to file(s), we do not setup a coord_sink : if (!coord_instance_->HasResultSink()) { : // When GetFInstanceState() returns the coordinator instance, the Prepare phase is : // done and the FragmentInstanceState's root sink will be set up. : coord_sink_ = coord_instance_->GetRootSink(); : DCHECK(coord_sink_ != nullptr); : } : } As mentioned in coordinator.h, I think coord_instance_ should be nullptr for a query with a result sink, so it probably makes more sense for QueryExecParams::GetCoordFragment() to return nullptr when there is a result sink. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@798 PS3, Line 798: Status Coordinator::FinalizeResultSink() { I would prefer to review this when I can see how it is being called. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@805 PS3, Line 805: RETURN_IF_ERROR(UpdateExecState(Status::OK(), nullptr, FLAGS_hostname)); If this returns an error (which it should if the query is not successful), it won't continue to the hdfs_table->ReleaseResources() below and do cleanup. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@894 PS3, Line 894: // DML finalization can only happen when all backends have completed all side-effects : // and reported relevant state. We also wait for backends to complete if we are writing : // results to a filesystem. : // External frontends can request to have query results written to a filesystem rather : // than streaming them to a client directly. For these types of queries we need to wait : // for all backends to complete. : if (stmt_type_ == TStmtType::DML || : (stmt_type_ == TStmtType::QUERY && coord_instance_->HasResultSink())) { : WaitForBackends(); : } If the below if statement does not apply to queries with result sinks, then this can move back to where it was before with a modified DCHECK. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@905 PS3, Line 905: if (stmt_type_ == TStmtType::QUERY) { I think it would make sense to treat a query with a result sink like a DML rather than a QUERY. So, I would make this if block only execute for a query where there is not a result sink. Then it would fall through to the DML case, which would be broadened to be DML + QUERY with result sink. http://gerrit.cloudera.org:8080/#/c/17144/3/be/src/runtime/coordinator.cc@940 PS3, Line 940: || coord_instance_->HasResultSink() If a query with a result sink goes through the current DML path, then it would execute SetNonErrorTerminalState with RETURNED_RESULTS. That would satisfy the other condition on this if, so this would not be needed. Without this change, a successful execution never transitions to a terminal state. DMLs transition in Wait(). Queries transition in GetNext() when *eos=true. This does neither, so it never moves from EXECUTING to RETURNED_RESULTS. -- To view, visit http://gerrit.cloudera.org:8080/17144 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I024bf41d77bb81f1ab0debdbd31ec3687c83f072 Gerrit-Change-Number: 17144 Gerrit-PatchSet: 3 Gerrit-Owner: John Sherman <[email protected]> Gerrit-Reviewer: Aman Sinha <[email protected]> Gerrit-Reviewer: Impala Public Jenkins <[email protected]> Gerrit-Reviewer: Joe McDonnell <[email protected]> Gerrit-Reviewer: John Sherman <[email protected]> Gerrit-Reviewer: Thomas Tauber-Marshall <[email protected]> Gerrit-Comment-Date: Wed, 10 Mar 2021 00:38:21 +0000 Gerrit-HasComments: Yes
