Hi,

这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置

Best,
Shammon FY

On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <guoxin...@betalpha.com.invalid> wrote:

> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> required resources, failing slot requests. Acquired:
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
>
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder<RowData> builder = DorisSink.builder();
>         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
>         dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
>                 .setUsername(parameterTool.get("doris.user"))
>                 .setPassword(parameterTool.get("doris.password"));
>
>         Properties pro = new Properties();
>         pro.setProperty("format", "json");
>         pro.setProperty("read_json_by_line", "true");
>
>         Date date = new Date();
>         DorisExecutionOptions.Builder executionBuilder =
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>
>         String[] fields =
> {"uid","subject","trade_date","update_time","value"};
>         DataType[] types =
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>
>         builder.setDorisReadOptions(DorisReadOptions.builder().build())
>                 .setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
>                 .setDorisOptions(dorisBuilder.build());
>         fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
>                 .name("fundCategorySinkName”);
>
>
>

回复