This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b382c17d4 [Fix](Outfile) Fixed the problem that the concurrent 
Outfile wrote multiple Success files (#33016)
65b382c17d4 is described below

commit 65b382c17d4077fdeb76901c6d2fcb006176d7a3
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Tue Apr 9 14:51:44 2024 +0800

    [Fix](Outfile) Fixed the problem that the concurrent Outfile wrote multiple 
Success files (#33016)
    
    **Problem:**
    When we enable concurrent `Outfile` and specify the `success_file_name`, a 
SUCCESS file is written for each BE instance, which is not what we expected
    
    
    **solution:**
    Therefore, we added a new RPC request that when the Outfile was completed, 
the FE sent an RPC to the Be request to write one Success file.
---
 be/src/service/internal_service.cpp                | 80 ++++++++++++++++++++++
 be/src/service/internal_service.h                  |  5 ++
 be/src/vec/sink/writer/vfile_result_writer.cpp     | 35 ----------
 be/src/vec/sink/writer/vfile_result_writer.h       |  2 -
 .../org/apache/doris/analysis/OutFileClause.java   |  8 +++
 .../doris/nereids/glue/LogicalPlanAdapter.java     | 17 +++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  8 +++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 78 +++++++++++++++++++++
 .../org/apache/doris/rpc/BackendServiceClient.java |  5 ++
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 12 ++++
 gensrc/proto/internal_service.proto                | 12 ++++
 .../suites/export_p0/test_outfile.groovy           |  5 +-
 .../suites/nereids_p0/outfile/test_outfile.groovy  |  6 +-
 13 files changed, 230 insertions(+), 43 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index ab8b2f9cb66..adcd81689c1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -26,6 +26,7 @@
 #include <butil/iobuf.h>
 #include <fcntl.h>
 #include <fmt/core.h>
+#include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/MasterService_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
@@ -615,6 +616,85 @@ void 
PInternalService::fetch_data(google::protobuf::RpcController* controller,
     }
 }
 
+void PInternalService::outfile_write_success(google::protobuf::RpcController* 
controller,
+                                             const 
POutfileWriteSuccessRequest* request,
+                                             POutfileWriteSuccessResult* 
result,
+                                             google::protobuf::Closure* done) {
+    bool ret = _heavy_work_pool.try_offer([request, result, done]() {
+        VLOG_RPC << "outfile write success file";
+        brpc::ClosureGuard closure_guard(done);
+        TResultFileSink result_file_sink;
+        Status st = Status::OK();
+        {
+            const uint8_t* buf = (const 
uint8_t*)(request->result_file_sink().data());
+            uint32_t len = request->result_file_sink().size();
+            st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
+            if (!st.ok()) {
+                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+
+        TResultFileSinkOptions file_options = result_file_sink.file_options;
+        std::stringstream ss;
+        ss << file_options.file_path << file_options.success_file_name;
+        std::string file_name = ss.str();
+        if (result_file_sink.storage_backend_type == 
TStorageBackendType::LOCAL) {
+            // For local file writer, the file_path is a local dir.
+            // Here we do a simple security verification by checking whether 
the file exists.
+            // Because the file path is currently arbitrarily specified by the 
user,
+            // Doris is not responsible for ensuring the correctness of the 
path.
+            // This is just to prevent overwriting the existing file.
+            bool exists = true;
+            st = io::global_local_filesystem()->exists(file_name, &exists);
+            if (!st.ok()) {
+                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+            if (exists) {
+                st = Status::InternalError("File already exists: {}", 
file_name);
+            }
+            if (!st.ok()) {
+                LOG(WARNING) << "outfile write success filefailed, errmsg=" << 
st;
+                st.to_protobuf(result->mutable_status());
+                return;
+            }
+        }
+
+        auto&& res = FileFactory::create_file_writer(
+                
FileFactory::convert_storage_type(result_file_sink.storage_backend_type),
+                ExecEnv::GetInstance(), file_options.broker_addresses,
+                file_options.broker_properties, file_name);
+        using T = std::decay_t<decltype(res)>;
+        if (!res.has_value()) [[unlikely]] {
+            st = std::forward<T>(res).error();
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+
+        std::unique_ptr<doris::io::FileWriter> _file_writer_impl = 
std::forward<T>(res).value();
+        // must write somthing because s3 file writer can not writer empty file
+        st = _file_writer_impl->append({"success"});
+        if (!st.ok()) {
+            LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+        st = _file_writer_impl->close();
+        if (!st.ok()) {
+            LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
+            st.to_protobuf(result->mutable_status());
+            return;
+        }
+    });
+    if (!ret) {
+        offer_failed(result, done, _heavy_work_pool);
+        return;
+    }
+}
+
 void PInternalService::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                           const PFetchTableSchemaRequest* 
request,
                                           PFetchTableSchemaResult* result,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index bdb91a0bdf7..fdf3a183d96 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -96,6 +96,11 @@ public:
     void fetch_data(google::protobuf::RpcController* controller, const 
PFetchDataRequest* request,
                     PFetchDataResult* result, google::protobuf::Closure* done) 
override;
 
+    void outfile_write_success(google::protobuf::RpcController* controller,
+                               const POutfileWriteSuccessRequest* request,
+                               POutfileWriteSuccessResult* result,
+                               google::protobuf::Closure* done) override;
+
     void fetch_table_schema(google::protobuf::RpcController* controller,
                             const PFetchTableSchemaRequest* request,
                             PFetchTableSchemaResult* result,
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index f61f65b998f..811658afa4d 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -99,37 +99,6 @@ void VFileResultWriter::_init_profile(RuntimeProfile* 
parent_profile) {
     _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", 
TUnit::BYTES);
 }
 
-Status VFileResultWriter::_create_success_file() {
-    std::string file_name;
-    RETURN_IF_ERROR(_get_success_file_name(&file_name));
-    _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
-            FileFactory::convert_storage_type(_storage_type), 
_state->exec_env(),
-            _file_opts->broker_addresses, _file_opts->broker_properties, 
file_name));
-    // must write somthing because s3 file writer can not writer empty file
-    RETURN_IF_ERROR(_file_writer_impl->append({"success"}));
-    return _file_writer_impl->close();
-}
-
-Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
-    std::stringstream ss;
-    ss << _file_opts->file_path << _file_opts->success_file_name;
-    *file_name = ss.str();
-    if (_storage_type == TStorageBackendType::LOCAL) {
-        // For local file writer, the file_path is a local dir.
-        // Here we do a simple security verification by checking whether the 
file exists.
-        // Because the file path is currently arbitrarily specified by the 
user,
-        // Doris is not responsible for ensuring the correctness of the path.
-        // This is just to prevent overwriting the existing file.
-        bool exists = true;
-        RETURN_IF_ERROR(io::global_local_filesystem()->exists(*file_name, 
&exists));
-        if (exists) {
-            return Status::InternalError("File already exists: {}", 
*file_name);
-        }
-    }
-
-    return Status::OK();
-}
-
 Status VFileResultWriter::_create_next_file_writer() {
     std::string file_name;
     RETURN_IF_ERROR(_get_next_file_name(&file_name));
@@ -275,10 +244,6 @@ Status VFileResultWriter::_close_file_writer(bool done) {
         RETURN_IF_ERROR(_create_next_file_writer());
     } else {
         // All data is written to file, send statistic result
-        if (_file_opts->success_file_name != "") {
-            // write success file, just need to touch an empty file
-            RETURN_IF_ERROR(_create_success_file());
-        }
         if (_output_block == nullptr) {
             RETURN_IF_ERROR(_send_result());
         } else {
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 29fd6d89cd3..72ba90cd015 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -79,10 +79,8 @@ private:
 
     Status _create_file_writer(const std::string& file_name);
     Status _create_next_file_writer();
-    Status _create_success_file();
     // get next export file name
     Status _get_next_file_name(std::string* file_name);
-    Status _get_success_file_name(std::string* file_name);
     void _get_file_url(std::string* file_url);
     std::string _file_format_to_name();
     // close file writer, and if !done, it will create new writer for next 
file.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 8c975d4ea0d..0314b225e67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -778,6 +778,14 @@ public class OutFileClause {
         return fileFormatType == TFileFormatType.FORMAT_ORC;
     }
 
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public String getSuccessFileName() {
+        return successFileName;
+    }
+
     @Override
     public OutFileClause clone() {
         return new OutFileClause(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
index 72dd6c11e9b..8fdcbb1198d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java
@@ -24,11 +24,14 @@ import org.apache.doris.analysis.Queriable;
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 
+import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.List;
 
@@ -66,6 +69,20 @@ public class LogicalPlanAdapter extends StatementBase 
implements Queriable {
 
     @Override
     public OutFileClause getOutFileClause() {
+        if (logicalPlan instanceof LogicalFileSink) {
+            LogicalFileSink fileSink = (LogicalFileSink) logicalPlan;
+            OutFileClause outFile = new OutFileClause(
+                    fileSink.getFilePath(),
+                    fileSink.getFormat(),
+                    fileSink.getProperties()
+            );
+            try {
+                outFile.analyze(null, Lists.newArrayList(), 
Lists.newArrayList());
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage(), e.getCause());
+            }
+            return outFile;
+        }
         return null;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 40400844675..82b7cef3607 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -4086,6 +4086,14 @@ public class Coordinator implements CoordInterface {
         return result;
     }
 
+    public Map<PlanFragmentId, FragmentExecParams> getFragmentExecParamsMap() {
+        return fragmentExecParamsMap;
+    }
+
+    public List<PlanFragment> getFragments() {
+        return fragments;
+    }
+
     // Runtime filter target fragment instance param
     static class FRuntimeFilterTargetParam {
         public TUniqueId targetFragmentInstanceId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ac5e3221355..5bbe01ef793 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -65,6 +65,8 @@ import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.StmtRewriter;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.SwitchStmt;
 import org.apache.doris.analysis.TableName;
@@ -82,6 +84,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
@@ -112,6 +115,7 @@ import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
+import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.ProfileManager.ProfileType;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
@@ -148,19 +152,26 @@ import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableComma
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.proto.InternalService.POutfileWriteSuccessRequest;
+import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
 import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.Coordinator.FragmentExecParams;
 import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.cache.Cache;
@@ -168,6 +179,7 @@ import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
+import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.statistics.ResultRow;
@@ -185,6 +197,8 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TResultBatch;
+import org.apache.doris.thrift.TResultFileSink;
+import org.apache.doris.thrift.TResultFileSinkOptions;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TSyncLoadForTabletsRequest;
@@ -207,6 +221,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 
 import java.io.IOException;
 import java.io.StringReader;
@@ -220,6 +235,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
@@ -1800,6 +1816,9 @@ public class StmtExecutor {
                         if (!isOutfileQuery) {
                             sendFields(queryStmt.getColLabels(), 
exprToType(queryStmt.getResultExprs()));
                         } else {
+                            if 
(!Strings.isNullOrEmpty(queryStmt.getOutFileClause().getSuccessFileName())) {
+                                
outfileWriteSuccess(queryStmt.getOutFileClause());
+                            }
                             sendFields(OutFileClause.RESULT_COL_NAMES, 
OutFileClause.RESULT_COL_TYPES);
                         }
                         isSendFields = true;
@@ -1857,6 +1876,65 @@ public class StmtExecutor {
         }
     }
 
+    private void outfileWriteSuccess(OutFileClause outFileClause) throws 
Exception {
+        // 1. set TResultFileSinkOptions
+        TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();
+
+        // 2. set brokerNetAddress
+        List<PlanFragment> fragments = coord.getFragments();
+        Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = 
coord.getFragmentExecParamsMap();
+        PlanFragmentId topId = fragments.get(0).getFragmentId();
+        FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
+        DataSink topDataSink = topParams.fragment.getSink();
+        TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
+        if (topDataSink instanceof ResultFileSink
+                && ((ResultFileSink) topDataSink).getStorageType() == 
StorageBackend.StorageType.BROKER) {
+            // set the broker address for OUTFILE sink
+            ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
+            FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
+                    .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
+            sinkOptions.setBrokerAddresses(Lists.newArrayList(new 
TNetworkAddress(broker.host, broker.port)));
+        }
+
+        // 3. set TResultFileSink properties
+        TResultFileSink sink = new TResultFileSink();
+        sink.setFileOptions(sinkOptions);
+        StorageType storageType = outFileClause.getBrokerDesc() == null
+                ? StorageBackend.StorageType.LOCAL : 
outFileClause.getBrokerDesc().getStorageType();
+        sink.setStorageBackendType(storageType.toThrift());
+
+        // 4. get BE
+        TNetworkAddress address = null;
+        for (Backend be : 
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+            if (be.isAlive()) {
+                address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
+                break;
+            }
+        }
+        if (address == null) {
+            throw new AnalysisException("No Alive backends");
+        }
+
+        // 5. send rpc to BE
+        POutfileWriteSuccessRequest request = 
POutfileWriteSuccessRequest.newBuilder()
+                .setResultFileSink(ByteString.copyFrom(new 
TSerializer().serialize(sink))).build();
+        Future<POutfileWriteSuccessResult> future = 
BackendServiceProxy.getInstance()
+                .outfileWriteSuccessAsync(address, request);
+        InternalService.POutfileWriteSuccessResult result = future.get();
+        TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+        String errMsg;
+        if (code != TStatusCode.OK) {
+            if (!result.getStatus().getErrorMsgsList().isEmpty()) {
+                errMsg = result.getStatus().getErrorMsgsList().get(0);
+            } else {
+                errMsg = "Outfile write success file failed. backend address: "
+                        + NetUtils
+                        .getHostPortInAccessibleFormat(address.getHostname(), 
address.getPort());
+            }
+            throw new AnalysisException(errMsg);
+        }
+    }
+
     private void handleTransactionStmt() throws Exception {
         if (context.getConnectType() == ConnectType.MYSQL) {
             // Every time set no send flag and clean all data in buffer
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 3f4bb846767..50afc7c96bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -106,6 +106,11 @@ public class BackendServiceClient {
         return stub.fetchArrowFlightSchema(request);
     }
 
+    public Future<InternalService.POutfileWriteSuccessResult> 
outfileWriteSuccessAsync(
+            InternalService.POutfileWriteSuccessRequest request) {
+        return stub.outfileWriteSuccess(request);
+    }
+
     public Future<InternalService.PFetchTableSchemaResult> 
fetchTableStructureAsync(
             InternalService.PFetchTableSchemaRequest request) {
         return stub.fetchTableSchema(request);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5c8251f5e12..e541b0eb689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -325,6 +325,18 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.POutfileWriteSuccessResult> 
outfileWriteSuccessAsync(TNetworkAddress address,
+            InternalService.POutfileWriteSuccessRequest request)  throws 
RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.outfileWriteSuccessAsync(request);
+        } catch (Throwable e) {
+            LOG.warn("outfile write success file catch a exception, 
address={}:{}",
+                    address.getHostname(), address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
     public Future<InternalService.PFetchTableSchemaResult> 
fetchTableStructureAsync(
             TNetworkAddress address, InternalService.PFetchTableSchemaRequest 
request) throws RpcException {
         try {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index b6579323aae..1a6dad5521b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -691,6 +691,17 @@ message PFetchTableSchemaResult {
   repeated PTypeDesc column_types = 4;
 }
 
+message POutfileWriteSuccessRequest {
+    // optional string file_path = 1;
+    // optional string success_file_name = 2;
+    // map<string, string> broker_properties = 4; // only for remote file
+    optional bytes result_file_sink = 1;
+}
+
+message POutfileWriteSuccessResult {
+    optional PStatus status = 1;
+}
+
 message PJdbcTestConnectionRequest {
     optional bytes jdbc_table = 1;
     optional int32 jdbc_table_type = 2;
@@ -954,6 +965,7 @@ service PBackendService {
     rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);
     rpc request_slave_tablet_pull_rowset(PTabletWriteSlaveRequest) returns 
(PTabletWriteSlaveResult);
     rpc response_slave_tablet_pull_rowset(PTabletWriteSlaveDoneRequest) 
returns (PTabletWriteSlaveDoneResult);
+    rpc outfile_write_success(POutfileWriteSuccessRequest) returns 
(POutfileWriteSuccessResult);
     rpc fetch_table_schema(PFetchTableSchemaRequest) returns 
(PFetchTableSchemaResult);
     rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
     rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns 
(PGetFileCacheMetaResponse);
diff --git a/regression-test/suites/export_p0/test_outfile.groovy 
b/regression-test/suites/export_p0/test_outfile.groovy
index 76c5bb688c4..8b60803e185 100644
--- a/regression-test/suites/export_p0/test_outfile.groovy
+++ b/regression-test/suites/export_p0/test_outfile.groovy
@@ -210,9 +210,8 @@ suite("test_outfile") {
                     (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
         sql "set enable_parallel_outfile = true;"
         sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
-        // TODO: parallel outfile is not compatible with success_file_name. 
remove this case temporary
-        // sql "set enable_parallel_outfile = true;"
-        // sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+
+        sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
     } finally {
         try_sql("DROP TABLE IF EXISTS select_into_file")
         File path = new File(outFilePath)
diff --git a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy 
b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
index 1cfdd7b62ce..f256df91809 100644
--- a/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
+++ b/regression-test/suites/nereids_p0/outfile/test_outfile.groovy
@@ -236,9 +236,9 @@ suite("test_outfile") {
                     (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
         sql "set enable_parallel_outfile = true;"
         sql """select * from select_into_file into outfile 
"file://${outFilePath}/";"""
-        // TODO: parallel outfile is not compatible with success_file_name. 
remove this case temporary
-        // sql "set enable_parallel_outfile = true;"
-        // sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
+
+        sql "set enable_parallel_outfile = true;"
+        sql """select * from select_into_file into outfile 
"file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
     } finally {
         try_sql("DROP TABLE IF EXISTS select_into_file")
         File path = new File(outFilePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to