我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 这是我doris sink的代码,flink doris connector版本是1.1.1 DorisSink.Builder<RowData> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) .setUsername(parameterTool.get("doris.user")) .setPassword(parameterTool.get("doris.password")); Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("read_json_by_line", "true"); Date date = new Date(); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); String[] fields = {"uid","subject","trade_date","update_time","value"}; DataType[] types = {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) .setDorisOptions(dorisBuilder.build()); fundCategoryDataStream.sinkTo(builder.build()) .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", "fund_category_sink")) .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) .name("fundCategorySinkName”);