退订
在 2023-06-20 11:16:18,"Yanfei Lei" <fredia...@gmail.com> 写道: >Hi, > >从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), >taskOffHeapMemory=1024.000gb (1099511627776 bytes), >managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb >(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap >memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory >size相关的参数[1]. > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size > >Best, >Yanfei > >Shammon FY <zjur...@gmail.com> 于2023年6月20日周二 08:45写道: >> >> Hi, >> >> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置 >> >> Best, >> Shammon FY >> >> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 <guoxin...@betalpha.com.invalid> wrote: >> >> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 >> > >> > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum >> > required resources, failing slot requests. Acquired: >> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb >> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), >> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 >> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered >> > TMs: 1, registered slots: 1 free slots: 0 >> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> > Could not acquire the minimum required resources. >> > >> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 >> > 这是我doris sink的代码,flink doris connector版本是1.1.1 >> > DorisSink.Builder<RowData> builder = DorisSink.builder(); >> > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); >> > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) >> > >> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) >> > .setUsername(parameterTool.get("doris.user")) >> > .setPassword(parameterTool.get("doris.password")); >> > >> > Properties pro = new Properties(); >> > pro.setProperty("format", "json"); >> > pro.setProperty("read_json_by_line", "true"); >> > >> > Date date = new Date(); >> > DorisExecutionOptions.Builder executionBuilder = >> > DorisExecutionOptions.builder(); >> > >> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); >> > >> > String[] fields = >> > {"uid","subject","trade_date","update_time","value"}; >> > DataType[] types = >> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; >> > >> > builder.setDorisReadOptions(DorisReadOptions.builder().build()) >> > .setDorisExecutionOptions(executionBuilder.build()) >> > >> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) >> > .setDorisOptions(dorisBuilder.build()); >> > fundCategoryDataStream.sinkTo(builder.build()) >> > >> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", >> > "fund_category_sink")) >> > >> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) >> > >> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) >> > .name("fundCategorySinkName”); >> > >> > >> >