Re: Flink1.14 需求超大内存

2023-06-19 Thread Yanfei Lei
Hi,

从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
taskOffHeapMemory=1024.000gb (1099511627776 bytes),
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory
size相关的参数[1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size

Best,
Yanfei

Shammon FY  于2023年6月20日周二 08:45写道:
>
> Hi,
>
> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
>
> Best,
> Shammon FY
>
> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:
>
> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
> >
> > DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> > required resources, failing slot requests. Acquired:
> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> > TMs: 1, registered slots: 1 free slots: 0
> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not acquire the minimum required resources.
> >
> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> > 这是我doris sink的代码,flink doris connector版本是1.1.1
> > DorisSink.Builder builder = DorisSink.builder();
> > DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
> >
> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> > .setUsername(parameterTool.get("doris.user"))
> > .setPassword(parameterTool.get("doris.password"));
> >
> > Properties pro = new Properties();
> > pro.setProperty("format", "json");
> > pro.setProperty("read_json_by_line", "true");
> >
> > Date date = new Date();
> > DorisExecutionOptions.Builder executionBuilder =
> > DorisExecutionOptions.builder();
> >
> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
> >
> > String[] fields =
> > {"uid","subject","trade_date","update_time","value"};
> > DataType[] types =
> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
> >
> > builder.setDorisReadOptions(DorisReadOptions.builder().build())
> > .setDorisExecutionOptions(executionBuilder.build())
> >
> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> > .setDorisOptions(dorisBuilder.build());
> > fundCategoryDataStream.sinkTo(builder.build())
> >
> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> > "fund_category_sink"))
> >
> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
> >
> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> > .name("fundCategorySinkName”);
> >
> >
> >


Re: Flink1.14 需求超大内存

2023-06-19 Thread Shammon FY
Hi,

这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置

Best,
Shammon FY

On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:

> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> required resources, failing slot requests. Acquired:
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
>
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder builder = DorisSink.builder();
> DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> .setUsername(parameterTool.get("doris.user"))
> .setPassword(parameterTool.get("doris.password"));
>
> Properties pro = new Properties();
> pro.setProperty("format", "json");
> pro.setProperty("read_json_by_line", "true");
>
> Date date = new Date();
> DorisExecutionOptions.Builder executionBuilder =
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>
> String[] fields =
> {"uid","subject","trade_date","update_time","value"};
> DataType[] types =
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>
> builder.setDorisReadOptions(DorisReadOptions.builder().build())
> .setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> .setDorisOptions(dorisBuilder.build());
> fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> .name("fundCategorySinkName”);
>
>
>


Re: Flink migration 1.15 to 1.17 - Os version change

2023-06-19 Thread Chesnay Schepler
The openjdk images are deprecated, which prevented us from releasing our 
docker images via some distribution channels.


https://issues.apache.org/jira/browse/FLINK-29137

When we switched to the Temurin images we didn't have to change a whole 
lot, so you might be able to reconstruct a openjdk-based image for Flink 
1.17.

https://github.com/apache/flink-docker/commit/3c259f46231b97202925a111a8205193c15bbf78

In theory you should be able to take the existing Dockerfile 
and 
change the base without having to change anything else.


On 19/06/2023 11:11, Arek Dankiewicz wrote:

Hello All,
We recently wanted to migrate flink docker image version from 1.15.1 
to 1.17.1 and encountered a problem with our infrastructure not being 
able to correctly handle an app built on Ubuntu 22+ due to an older 
version of the docker.
Unfortunately, this is out of our control, and we would like to change 
the os on which the flink 1.17 image is built from 
eclipse-temurin:11-jre-jammy to e.g. openjdk:11.


I would like to know whether the change to debian for version 1.17 is 
a disruptive change and what the OS change between 1.15 and 1.16 was 
caused by.


Kindest regards,
Arkadiusz




Flink migration 1.15 to 1.17 - Os version change

2023-06-19 Thread Arek Dankiewicz
Hello All,
We recently wanted to migrate flink docker image version from 1.15.1 to
1.17.1 and encountered a problem with our infrastructure not being able to
correctly handle an app built on Ubuntu 22+ due to an older version of the
docker.
Unfortunately, this is out of our control, and we would like to change the
os on which the flink 1.17 image is built from eclipse-temurin:11-jre-jammy
to e.g. openjdk:11.

I would like to know whether the change to debian for version 1.17 is a
disruptive change and what the OS change between 1.15 and 1.16 was caused
by.

Kindest regards,
Arkadiusz


Flink1.14 需求超大内存

2023-06-19 Thread 郭欣瑞
我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错

DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum required 
resources, failing slot requests. Acquired: 
[ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb 
(1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes), 
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864 
bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 
1, registered slots: 1 free slots: 0
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not acquire the minimum required resources.

我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
这是我doris sink的代码,flink doris connector版本是1.1.1
DorisSink.Builder builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))

.setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
.setUsername(parameterTool.get("doris.user"))
.setPassword(parameterTool.get("doris.password"));

Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");

Date date = new Date();
DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();

executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);

String[] fields = {"uid","subject","trade_date","update_time","value"};
DataType[] types = 
{DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())

.setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());
fundCategoryDataStream.sinkTo(builder.build())

.slotSharingGroup(parameterTool.get("fund_category_data_sink_group", 
"fund_category_sink"))

.setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))

.uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
.name("fundCategorySinkName”);