Re: Flink1.14 需求超大内存
我排查了一下,因为任务其实是跑在本地模式上,而我一直没有配置本地模式的slot数量导致slot不足,而这个1024G其实是一个默认值所以出现了需求1T内存这种奇怪的报错。 以往没有出现这种问题是因为以前本地模式会自动分配足够的slot,但flink doris connecter由于未知的原因没有被计入slot需求中,这就导致缺少一个slot无法达到需求。 > 2023年6月19日 16:18,郭欣瑞 写道: > > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 > > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum required > resources, failing slot requests. Acquired: > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: > 1, registered slots: 1 free slots: 0 > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not acquire the minimum required resources. > > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 > 这是我doris sink的代码,flink doris connector版本是1.1.1 > DorisSink.Builder builder = DorisSink.builder(); >DorisOptions.Builder dorisBuilder = DorisOptions.builder(); >dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) > > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) >.setUsername(parameterTool.get("doris.user")) >.setPassword(parameterTool.get("doris.password")); > >Properties pro = new Properties(); >pro.setProperty("format", "json"); >pro.setProperty("read_json_by_line", "true"); > >Date date = new Date(); >DorisExecutionOptions.Builder executionBuilder = > DorisExecutionOptions.builder(); > > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); > >String[] fields = {"uid","subject","trade_date","update_time","value"}; >DataType[] types = > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; > >builder.setDorisReadOptions(DorisReadOptions.builder().build()) >.setDorisExecutionOptions(executionBuilder.build()) > > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) >.setDorisOptions(dorisBuilder.build()); >fundCategoryDataStream.sinkTo(builder.build()) > > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", > "fund_category_sink")) > > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) > > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) >.name("fundCategorySinkName”); >
Re:Re: Flink1.14 需求超大内存
退订 在 2023-06-20 11:16:18,"Yanfei Lei" 写道: >Hi, > >从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), >taskOffHeapMemory=1024.000gb (1099511627776 bytes), >managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb >(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap >memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory >size相关的参数[1]. > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size > >Best, >Yanfei > >Shammon FY 于2023年6月20日周二 08:45写道: >> >> Hi, >> >> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置 >> >> Best, >> Shammon FY >> >> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 wrote: >> >> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 >> > >> > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum >> > required resources, failing slot requests. Acquired: >> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb >> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), >> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 >> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered >> > TMs: 1, registered slots: 1 free slots: 0 >> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> > Could not acquire the minimum required resources. >> > >> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 >> > 这是我doris sink的代码,flink doris connector版本是1.1.1 >> > DorisSink.Builder builder = DorisSink.builder(); >> > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); >> > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) >> > >> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) >> > .setUsername(parameterTool.get("doris.user")) >> > .setPassword(parameterTool.get("doris.password")); >> > >> > Properties pro = new Properties(); >> > pro.setProperty("format", "json"); >> > pro.setProperty("read_json_by_line", "true"); >> > >> > Date date = new Date(); >> > DorisExecutionOptions.Builder executionBuilder = >> > DorisExecutionOptions.builder(); >> > >> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); >> > >> > String[] fields = >> > {"uid","subject","trade_date","update_time","value"}; >> > DataType[] types = >> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; >> > >> > builder.setDorisReadOptions(DorisReadOptions.builder().build()) >> > .setDorisExecutionOptions(executionBuilder.build()) >> > >> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) >> > .setDorisOptions(dorisBuilder.build()); >> > fundCategoryDataStream.sinkTo(builder.build()) >> > >> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", >> > "fund_category_sink")) >> > >> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) >> > >> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) >> > .name("fundCategorySinkName”); >> > >> > >> >
Re: Flink1.14 需求超大内存
Hi, 从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory size相关的参数[1]. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size Best, Yanfei Shammon FY 于2023年6月20日周二 08:45写道: > > Hi, > > 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置 > > Best, > Shammon FY > > On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 wrote: > > > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 > > > > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum > > required resources, failing slot requests. Acquired: > > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb > > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), > > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 > > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered > > TMs: 1, registered slots: 1 free slots: 0 > > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > > Could not acquire the minimum required resources. > > > > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 > > 这是我doris sink的代码,flink doris connector版本是1.1.1 > > DorisSink.Builder builder = DorisSink.builder(); > > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); > > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) > > > > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) > > .setUsername(parameterTool.get("doris.user")) > > .setPassword(parameterTool.get("doris.password")); > > > > Properties pro = new Properties(); > > pro.setProperty("format", "json"); > > pro.setProperty("read_json_by_line", "true"); > > > > Date date = new Date(); > > DorisExecutionOptions.Builder executionBuilder = > > DorisExecutionOptions.builder(); > > > > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); > > > > String[] fields = > > {"uid","subject","trade_date","update_time","value"}; > > DataType[] types = > > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; > > > > builder.setDorisReadOptions(DorisReadOptions.builder().build()) > > .setDorisExecutionOptions(executionBuilder.build()) > > > > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) > > .setDorisOptions(dorisBuilder.build()); > > fundCategoryDataStream.sinkTo(builder.build()) > > > > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", > > "fund_category_sink")) > > > > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) > > > > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) > > .name("fundCategorySinkName”); > > > > > >
Re: Flink1.14 需求超大内存
Hi, 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置 Best, Shammon FY On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞 wrote: > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 > > DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum > required resources, failing slot requests. Acquired: > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered > TMs: 1, registered slots: 1 free slots: 0 > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Could not acquire the minimum required resources. > > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 > 这是我doris sink的代码,flink doris connector版本是1.1.1 > DorisSink.Builder builder = DorisSink.builder(); > DorisOptions.Builder dorisBuilder = DorisOptions.builder(); > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) > > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) > .setUsername(parameterTool.get("doris.user")) > .setPassword(parameterTool.get("doris.password")); > > Properties pro = new Properties(); > pro.setProperty("format", "json"); > pro.setProperty("read_json_by_line", "true"); > > Date date = new Date(); > DorisExecutionOptions.Builder executionBuilder = > DorisExecutionOptions.builder(); > > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); > > String[] fields = > {"uid","subject","trade_date","update_time","value"}; > DataType[] types = > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; > > builder.setDorisReadOptions(DorisReadOptions.builder().build()) > .setDorisExecutionOptions(executionBuilder.build()) > > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) > .setDorisOptions(dorisBuilder.build()); > fundCategoryDataStream.sinkTo(builder.build()) > > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", > "fund_category_sink")) > > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) > > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) > .name("fundCategorySinkName”); > > >
Flink1.14 需求超大内存
我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错 DeclarativeSlotPoolBridge.java:351 - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 0 org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources. 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了 这是我doris sink的代码,flink doris connector版本是1.1.1 DorisSink.Builder builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP")) .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data")) .setUsername(parameterTool.get("doris.user")) .setPassword(parameterTool.get("doris.password")); Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("read_json_by_line", "true"); Date date = new Date(); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro); String[] fields = {"uid","subject","trade_date","update_time","value"}; DataType[] types = {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()}; builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build()) .setDorisOptions(dorisBuilder.build()); fundCategoryDataStream.sinkTo(builder.build()) .slotSharingGroup(parameterTool.get("fund_category_data_sink_group", "fund_category_sink")) .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1)) .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1")) .name("fundCategorySinkName”);