Hi, 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
Best, Shammon FY On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <guoxin...@betalpha.com.invalid> wrote: > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 > > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum > required resources, failing slot requests. Acquired: > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered > TMs: 1, registered slots: 1 free slots: 0 > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not acquire the minimum required resources. > > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 > 这是我doris sink的代码,flink doris connector版本是1.1.1 > DorisSink.Builder<RowData> builder = DorisSink.builder(); > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) > > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) > .setUsername(parameterTool.get("doris.user")) > .setPassword(parameterTool.get("doris.password")); > > Properties pro = new Properties(); > pro.setProperty("format", "json"); > pro.setProperty("read_json_by_line", "true"); > > Date date = new Date(); > DorisExecutionOptions.Builder executionBuilder = > DorisExecutionOptions.builder(); > > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); > > String[] fields = > {"uid","subject","trade_date","update_time","value"}; > DataType[] types = > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; > > builder.setDorisReadOptions(DorisReadOptions.builder().build()) > .setDorisExecutionOptions(executionBuilder.build()) > > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) > .setDorisOptions(dorisBuilder.build()); > fundCategoryDataStream.sinkTo(builder.build()) > > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", > "fund_category_sink")) > > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) > > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) > .name("fundCategorySinkName”); > > >