This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 65b382c17d4 [Fix](Outfile) Fixed the problem that the concurrent Outfile wrote multiple Success files (#33016) 65b382c17d4 is described below commit 65b382c17d4077fdeb76901c6d2fcb006176d7a3 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Apr 9 14:51:44 2024 +0800 [Fix](Outfile) Fixed the problem that the concurrent Outfile wrote multiple Success files (#33016) **Problem:** When we enable concurrent `Outfile` and specify the `success_file_name`, a SUCCESS file is written for each BE instance, which is not what we expected **solution:** Therefore, we added a new RPC request that when the Outfile was completed, the FE sent an RPC to the Be request to write one Success file. --- be/src/service/internal_service.cpp | 80 ++++++++++++++++++++++ be/src/service/internal_service.h | 5 ++ be/src/vec/sink/writer/vfile_result_writer.cpp | 35 ---------- be/src/vec/sink/writer/vfile_result_writer.h | 2 - .../org/apache/doris/analysis/OutFileClause.java | 8 +++ .../doris/nereids/glue/LogicalPlanAdapter.java | 17 +++++ .../main/java/org/apache/doris/qe/Coordinator.java | 8 +++ .../java/org/apache/doris/qe/StmtExecutor.java | 78 +++++++++++++++++++++ .../org/apache/doris/rpc/BackendServiceClient.java | 5 ++ .../org/apache/doris/rpc/BackendServiceProxy.java | 12 ++++ gensrc/proto/internal_service.proto | 12 ++++ .../suites/export_p0/test_outfile.groovy | 5 +- .../suites/nereids_p0/outfile/test_outfile.groovy | 6 +- 13 files changed, 230 insertions(+), 43 deletions(-) diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index ab8b2f9cb66..adcd81689c1 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -26,6 +26,7 @@ #include <butil/iobuf.h> #include <fcntl.h> #include <fmt/core.h> +#include <gen_cpp/DataSinks_types.h> #include <gen_cpp/MasterService_types.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> @@ -615,6 +616,85 @@ void PInternalService::fetch_data(google::protobuf::RpcController* controller, } } +void PInternalService::outfile_write_success(google::protobuf::RpcController* controller, + const POutfileWriteSuccessRequest* request, + POutfileWriteSuccessResult* result, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "outfile write success file"; + brpc::ClosureGuard closure_guard(done); + TResultFileSink result_file_sink; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)(request->result_file_sink().data()); + uint32_t len = request->result_file_sink().size(); + st = deserialize_thrift_msg(buf, &len, false, &result_file_sink); + if (!st.ok()) { + LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + + TResultFileSinkOptions file_options = result_file_sink.file_options; + std::stringstream ss; + ss << file_options.file_path << file_options.success_file_name; + std::string file_name = ss.str(); + if (result_file_sink.storage_backend_type == TStorageBackendType::LOCAL) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + bool exists = true; + st = io::global_local_filesystem()->exists(file_name, &exists); + if (!st.ok()) { + LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + if (exists) { + st = Status::InternalError("File already exists: {}", file_name); + } + if (!st.ok()) { + LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + + auto&& res = FileFactory::create_file_writer( + FileFactory::convert_storage_type(result_file_sink.storage_backend_type), + ExecEnv::GetInstance(), file_options.broker_addresses, + file_options.broker_properties, file_name); + using T = std::decay_t<decltype(res)>; + if (!res.has_value()) [[unlikely]] { + st = std::forward<T>(res).error(); + st.to_protobuf(result->mutable_status()); + return; + } + + std::unique_ptr<doris::io::FileWriter> _file_writer_impl = std::forward<T>(res).value(); + // must write somthing because s3 file writer can not writer empty file + st = _file_writer_impl->append({"success"}); + if (!st.ok()) { + LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + st = _file_writer_impl->close(); + if (!st.ok()) { + LOG(WARNING) << "outfile write success filefailed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + }); + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + void PInternalService::fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index bdb91a0bdf7..fdf3a183d96 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -96,6 +96,11 @@ public: void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) override; + void outfile_write_success(google::protobuf::RpcController* controller, + const POutfileWriteSuccessRequest* request, + POutfileWriteSuccessResult* result, + google::protobuf::Closure* done) override; + void fetch_table_schema(google::protobuf::RpcController* controller, const PFetchTableSchemaRequest* request, PFetchTableSchemaResult* result, diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index f61f65b998f..811658afa4d 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -99,37 +99,6 @@ void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) { _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); } -Status VFileResultWriter::_create_success_file() { - std::string file_name; - RETURN_IF_ERROR(_get_success_file_name(&file_name)); - _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( - FileFactory::convert_storage_type(_storage_type), _state->exec_env(), - _file_opts->broker_addresses, _file_opts->broker_properties, file_name)); - // must write somthing because s3 file writer can not writer empty file - RETURN_IF_ERROR(_file_writer_impl->append({"success"})); - return _file_writer_impl->close(); -} - -Status VFileResultWriter::_get_success_file_name(std::string* file_name) { - std::stringstream ss; - ss << _file_opts->file_path << _file_opts->success_file_name; - *file_name = ss.str(); - if (_storage_type == TStorageBackendType::LOCAL) { - // For local file writer, the file_path is a local dir. - // Here we do a simple security verification by checking whether the file exists. - // Because the file path is currently arbitrarily specified by the user, - // Doris is not responsible for ensuring the correctness of the path. - // This is just to prevent overwriting the existing file. - bool exists = true; - RETURN_IF_ERROR(io::global_local_filesystem()->exists(*file_name, &exists)); - if (exists) { - return Status::InternalError("File already exists: {}", *file_name); - } - } - - return Status::OK(); -} - Status VFileResultWriter::_create_next_file_writer() { std::string file_name; RETURN_IF_ERROR(_get_next_file_name(&file_name)); @@ -275,10 +244,6 @@ Status VFileResultWriter::_close_file_writer(bool done) { RETURN_IF_ERROR(_create_next_file_writer()); } else { // All data is written to file, send statistic result - if (_file_opts->success_file_name != "") { - // write success file, just need to touch an empty file - RETURN_IF_ERROR(_create_success_file()); - } if (_output_block == nullptr) { RETURN_IF_ERROR(_send_result()); } else { diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 29fd6d89cd3..72ba90cd015 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -79,10 +79,8 @@ private: Status _create_file_writer(const std::string& file_name); Status _create_next_file_writer(); - Status _create_success_file(); // get next export file name Status _get_next_file_name(std::string* file_name); - Status _get_success_file_name(std::string* file_name); void _get_file_url(std::string* file_url); std::string _file_format_to_name(); // close file writer, and if !done, it will create new writer for next file. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 8c975d4ea0d..0314b225e67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -778,6 +778,14 @@ public class OutFileClause { return fileFormatType == TFileFormatType.FORMAT_ORC; } + public String getFilePath() { + return filePath; + } + + public String getSuccessFileName() { + return successFileName; + } + @Override public OutFileClause clone() { return new OutFileClause(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java index 72dd6c11e9b..8fdcbb1198d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java @@ -24,11 +24,14 @@ import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StatementBase; import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import com.google.common.collect.Lists; + import java.util.ArrayList; import java.util.List; @@ -66,6 +69,20 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable { @Override public OutFileClause getOutFileClause() { + if (logicalPlan instanceof LogicalFileSink) { + LogicalFileSink fileSink = (LogicalFileSink) logicalPlan; + OutFileClause outFile = new OutFileClause( + fileSink.getFilePath(), + fileSink.getFormat(), + fileSink.getProperties() + ); + try { + outFile.analyze(null, Lists.newArrayList(), Lists.newArrayList()); + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + return outFile; + } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 40400844675..82b7cef3607 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -4086,6 +4086,14 @@ public class Coordinator implements CoordInterface { return result; } + public Map<PlanFragmentId, FragmentExecParams> getFragmentExecParamsMap() { + return fragmentExecParamsMap; + } + + public List<PlanFragment> getFragments() { + return fragments; + } + // Runtime filter target fragment instance param static class FRuntimeFilterTargetParam { public TUniqueId targetFragmentInstanceId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ac5e3221355..5bbe01ef793 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -65,6 +65,8 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StmtRewriter; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.SwitchStmt; import org.apache.doris.analysis.TableName; @@ -82,6 +84,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EnvFactory; +import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -112,6 +115,7 @@ import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; +import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; @@ -148,19 +152,26 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.planner.DataSink; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; +import org.apache.doris.proto.InternalService.POutfileWriteSuccessRequest; +import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.ConnectContext.ConnectType; +import org.apache.doris.qe.Coordinator.FragmentExecParams; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.cache.Cache; @@ -168,6 +179,7 @@ import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; @@ -185,6 +197,8 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.thrift.TResultFileSink; +import org.apache.doris.thrift.TResultFileSinkOptions; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TSyncLoadForTabletsRequest; @@ -207,6 +221,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import java.io.IOException; import java.io.StringReader; @@ -220,6 +235,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; @@ -1800,6 +1816,9 @@ public class StmtExecutor { if (!isOutfileQuery) { sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs())); } else { + if (!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) { + outfileWriteSuccess(queryStmt.getOutFileClause()); + } sendFields(OutFileClause.RESULT_COL_NAMES, OutFileClause.RESULT_COL_TYPES); } isSendFields = true; @@ -1857,6 +1876,65 @@ public class StmtExecutor { } } + private void outfileWriteSuccess(OutFileClause outFileClause) throws Exception { + // 1. set TResultFileSinkOptions + TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions(); + + // 2. set brokerNetAddress + List<PlanFragment> fragments = coord.getFragments(); + Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = coord.getFragmentExecParamsMap(); + PlanFragmentId topId = fragments.get(0).getFragmentId(); + FragmentExecParams topParams = fragmentExecParamsMap.get(topId); + DataSink topDataSink = topParams.fragment.getSink(); + TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; + if (topDataSink instanceof ResultFileSink + && ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) { + // set the broker address for OUTFILE sink + ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; + FsBroker broker = Env.getCurrentEnv().getBrokerMgr() + .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); + sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port))); + } + + // 3. set TResultFileSink properties + TResultFileSink sink = new TResultFileSink(); + sink.setFileOptions(sinkOptions); + StorageType storageType = outFileClause.getBrokerDesc() == null + ? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType(); + sink.setStorageBackendType(storageType.toThrift()); + + // 4. get BE + TNetworkAddress address = null; + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + break; + } + } + if (address == null) { + throw new AnalysisException("No Alive backends"); + } + + // 5. send rpc to BE + POutfileWriteSuccessRequest request = POutfileWriteSuccessRequest.newBuilder() + .setResultFileSink(ByteString.copyFrom(new TSerializer().serialize(sink))).build(); + Future<POutfileWriteSuccessResult> future = BackendServiceProxy.getInstance() + .outfileWriteSuccessAsync(address, request); + InternalService.POutfileWriteSuccessResult result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + String errMsg; + if (code != TStatusCode.OK) { + if (!result.getStatus().getErrorMsgsList().isEmpty()) { + errMsg = result.getStatus().getErrorMsgsList().get(0); + } else { + errMsg = "Outfile write success file failed. backend address: " + + NetUtils + .getHostPortInAccessibleFormat(address.getHostname(), address.getPort()); + } + throw new AnalysisException(errMsg); + } + } + private void handleTransactionStmt() throws Exception { if (context.getConnectType() == ConnectType.MYSQL) { // Every time set no send flag and clean all data in buffer diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 3f4bb846767..50afc7c96bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -106,6 +106,11 @@ public class BackendServiceClient { return stub.fetchArrowFlightSchema(request); } + public Future<InternalService.POutfileWriteSuccessResult> outfileWriteSuccessAsync( + InternalService.POutfileWriteSuccessRequest request) { + return stub.outfileWriteSuccess(request); + } + public Future<InternalService.PFetchTableSchemaResult> fetchTableStructureAsync( InternalService.PFetchTableSchemaRequest request) { return stub.fetchTableSchema(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 5c8251f5e12..e541b0eb689 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -325,6 +325,18 @@ public class BackendServiceProxy { } } + public Future<InternalService.POutfileWriteSuccessResult> outfileWriteSuccessAsync(TNetworkAddress address, + InternalService.POutfileWriteSuccessRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.outfileWriteSuccessAsync(request); + } catch (Throwable e) { + LOG.warn("outfile write success file catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future<InternalService.PFetchTableSchemaResult> fetchTableStructureAsync( TNetworkAddress address, InternalService.PFetchTableSchemaRequest request) throws RpcException { try { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index b6579323aae..1a6dad5521b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -691,6 +691,17 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message POutfileWriteSuccessRequest { + // optional string file_path = 1; + // optional string success_file_name = 2; + // map<string, string> broker_properties = 4; // only for remote file + optional bytes result_file_sink = 1; +} + +message POutfileWriteSuccessResult { + optional PStatus status = 1; +} + message PJdbcTestConnectionRequest { optional bytes jdbc_table = 1; optional int32 jdbc_table_type = 2; @@ -954,6 +965,7 @@ service PBackendService { rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse); rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns (PTabletWriteSlaveResult); rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) returns (PTabletWriteSlaveDoneResult); + rpc outfile_write_success(POutfileWriteSuccessRequest) returns (POutfileWriteSuccessResult); rpc fetch_table_schema(PFetchTableSchemaRequest) returns (PFetchTableSchemaResult); rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse); rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns (PGetFileCacheMetaResponse); diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index 76c5bb688c4..8b60803e185 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -210,9 +210,8 @@ suite("test_outfile") { (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");""" sql "set enable_parallel_outfile = true;" sql """select * from select_into_file into outfile "file://${outFilePath}/";""" - // TODO: parallel outfile is not compatible with success_file_name. remove this case temporary - // sql "set enable_parallel_outfile = true;" - // sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + + sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" } finally { try_sql("DROP TABLE IF EXISTS select_into_file") File path = new File(outFilePath) diff --git a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy index 1cfdd7b62ce..f256df91809 100644 --- a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy +++ b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy @@ -236,9 +236,9 @@ suite("test_outfile") { (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");""" sql "set enable_parallel_outfile = true;" sql """select * from select_into_file into outfile "file://${outFilePath}/";""" - // TODO: parallel outfile is not compatible with success_file_name. remove this case temporary - // sql "set enable_parallel_outfile = true;" - // sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + + sql "set enable_parallel_outfile = true;" + sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" } finally { try_sql("DROP TABLE IF EXISTS select_into_file") File path = new File(outFilePath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org