Re: Flink 1.18.1 ,重启状态恢复

2024-05-16 文章 Yanfei Lei
看起来和 FLINK-34063 / FLINK-33863 是同样的问题,您可以升级到1.18.2 试试看。
[1] https://issues.apache.org/jira/browse/FLINK-33863
[2] https://issues.apache.org/jira/browse/FLINK-34063

陈叶超  于2024年5月16日周四 16:38写道:
>
> 升级到 flink 1.18.1 ,任务重启状态恢复的话,遇到如下报错:
> 2024-04-09 13:03:48
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:258)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for 
> RowDataStoreWriteOperator_8d96fc510e75de3baf03ef7367db7d42_(2/2) from any of 
> the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:289)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:176)
> ... 11 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore operator state backend
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:88)
> at 
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:533)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:380)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:280)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ... 13 more
> Caused by: java.io.IOException: invalid stream header
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:235)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:145)
> at 
> org.xerial.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:129)
> at 
> org.apache.flink.runtime.state.SnappyStreamCompressionDecorator.decorateWithCompression(SnappyStreamCompressionDecorator.java:53)
> at 
> org.apache.flink.runtime.state.StreamCompressionDecorator.decorateWithCompression(StreamCompressionDecorator.java:60)
> at 
> org.apache.flink.runtime.state.CompressibleFSDataInputStream.(CompressibleFSDataInputStream.java:39)
> at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:185)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:85)
> ... 18 more
>


-- 
Best,
Yanfei


Re: Re: java.io.IOException: Could not load the native RocksDB library

2024-05-06 文章 Yanfei Lei
; Sink: Print to Std. Out (2/4)#0 
> (2500c455c9c458780199da504300da05_90bea66de1c231edf33913ecd54406c1_1_0) 
> switched from INITIALIZING to FAILED with failure cause:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:294)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:266)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:799)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:753)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:753)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:712)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> [flink-runtime-1.19.0.jar:1.19.0]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_90bea66de1c231edf33913ecd54406c1_(2/4) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:165)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:399)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180)
>  ~[flink-streaming-java-1.19.0.jar:1.19.0]
> ... 12 more
>
>
>
> From: Yanfei Lei
> Date: 2024-05-07 10:07
> To: user-zh
> Subject: Re: java.io.IOException: Could not load the native RocksDB library
> 请问是什么开发环境呢? windows吗?
> 可以分享一下更详细的报错吗?比如.dll 找不到
>
> ha.fen...@aisino.com  于2024年5月7日周二 09:34写道:
> >
> > Configuration config = new Configuration();
> > config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
> > config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
> > config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:\\d:\\cdc");
> >
> > 开发环境Flink1.17包中运行没有问题
> > 开发环境Flink1.19包中运行提示
> >
> > java.io.IOException: Could not load the native RocksDB library
>
>
>
> --
> Best,
> Yanfei



-- 
Best,
Yanfei


Re: java.io.IOException: Could not load the native RocksDB library

2024-05-06 文章 Yanfei Lei
请问是什么开发环境呢? windows吗?
可以分享一下更详细的报错吗?比如.dll 找不到

ha.fen...@aisino.com  于2024年5月7日周二 09:34写道:
>
> Configuration config = new Configuration();
> config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
> config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
> config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file:\\d:\\cdc");
>
> 开发环境Flink1.17包中运行没有问题
> 开发环境Flink1.19包中运行提示
>
> java.io.IOException: Could not load the native RocksDB library



-- 
Best,
Yanfei


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 文章 Yanfei Lei
Congrats, thanks for the great work!

Sergey Nuyanzin  于2024年3月18日周一 19:30写道:
>
> Congratulations, thanks release managers and everyone involved for the great 
> work!
>
> On Mon, Mar 18, 2024 at 12:15 PM Benchao Li  wrote:
>>
>> Congratulations! And thanks to all release managers and everyone
>> involved in this release!
>>
>> Yubin Li  于2024年3月18日周一 18:11写道:
>> >
>> > Congratulations!
>> >
>> > Thanks to release managers and everyone involved.
>> >
>> > On Mon, Mar 18, 2024 at 5:55 PM Hangxiang Yu  wrote:
>> > >
>> > > Congratulations!
>> > > Thanks release managers and all involved!
>> > >
>> > > On Mon, Mar 18, 2024 at 5:23 PM Hang Ruan  wrote:
>> > >
>> > > > Congratulations!
>> > > >
>> > > > Best,
>> > > > Hang
>> > > >
>> > > > Paul Lam  于2024年3月18日周一 17:18写道:
>> > > >
>> > > > > Congrats! Thanks to everyone involved!
>> > > > >
>> > > > > Best,
>> > > > > Paul Lam
>> > > > >
>> > > > > > 2024年3月18日 16:37,Samrat Deb  写道:
>> > > > > >
>> > > > > > Congratulations !
>> > > > > >
>> > > > > > On Mon, 18 Mar 2024 at 2:07 PM, Jingsong Li 
>> > > > > > 
>> > > > > wrote:
>> > > > > >
>> > > > > >> Congratulations!
>> > > > > >>
>> > > > > >> On Mon, Mar 18, 2024 at 4:30 PM Rui Fan <1996fan...@gmail.com> 
>> > > > > >> wrote:
>> > > > > >>>
>> > > > > >>> Congratulations, thanks for the great work!
>> > > > > >>>
>> > > > > >>> Best,
>> > > > > >>> Rui
>> > > > > >>>
>> > > > > >>> On Mon, Mar 18, 2024 at 4:26 PM Lincoln Lee 
>> > > > > >>> 
>> > > > > >> wrote:
>> > > > > 
>> > > > >  The Apache Flink community is very happy to announce the 
>> > > > >  release of
>> > > > > >> Apache Flink 1.19.0, which is the fisrt release for the Apache 
>> > > > > >> Flink
>> > > > > 1.19
>> > > > > >> series.
>> > > > > 
>> > > > >  Apache Flink® is an open-source stream processing framework for
>> > > > > >> distributed, high-performing, always-available, and accurate data
>> > > > > streaming
>> > > > > >> applications.
>> > > > > 
>> > > > >  The release is available for download at:
>> > > > >  https://flink.apache.org/downloads.html
>> > > > > 
>> > > > >  Please check out the release blog post for an overview of the
>> > > > > >> improvements for this bugfix release:
>> > > > > 
>> > > > > >>
>> > > > >
>> > > > https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> > > > > 
>> > > > >  The full release notes are available in Jira:
>> > > > > 
>> > > > > >>
>> > > > >
>> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> > > > > 
>> > > > >  We would like to thank all contributors of the Apache Flink
>> > > > community
>> > > > > >> who made this release possible!
>> > > > > 
>> > > > > 
>> > > > >  Best,
>> > > > >  Yun, Jing, Martijn and Lincoln
>> > > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > Best,
>> > > Hangxiang.
>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards,
> Sergey



-- 
Best,
Yanfei


Re: Flink 窗口触发条件

2023-08-09 文章 Yanfei Lei
hi,
感觉和[1]的问题比较像,事件时间的window在onElement和onEventTime时会触发,这两个方法又会根据watermark判断,可以看看o.a.f.table.runtime.operators.window.triggers包和o.a.f.table.runtime.operators.wmassigners包。

[1] https://juejin.cn/post/6850418110010179597

小昌同学  于2023年8月10日周四 10:52写道:
>
> 各位老师好,我这边在使用Flink的事件时间窗口的时候,关于窗口触发的条件我有一点疑问想咨询一下各位老师
> 我是开了一个2分钟的事件时间的窗口,但是等到两分钟后窗口并没有主动触发,等我后面再发一条数据的时候,窗口再进行了触发
> 所以我想请问一下窗口的触发机制不是时间点嘛,而是非要等到下一条数据发送,依赖于下一条数据携带的时间戳大于窗口的结束时间,上一个窗口才会真正的触发嘛
> 请各位老师指导一下
>
>
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



-- 
Best,
Yanfei


Re: Flink1.14 需求超大内存

2023-06-19 文章 Yanfei Lei
Hi,

从 ResourceProfile{taskHeapMemory=1024.000gb (1099511627776 bytes),
taskOffHeapMemory=1024.000gb (1099511627776 bytes),
managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb
(67108864 bytes)}, numberOfRequiredSlots=1}] 来看,sink节点想申请 1T的 heap
memory 和 1T的 off heap memory,可以再额外检查一下代码或者flink-conf里 是否配置了 memory
size相关的参数[1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-memory-process-size

Best,
Yanfei

Shammon FY  于2023年6月20日周二 08:45写道:
>
> Hi,
>
> 这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置
>
> Best,
> Shammon FY
>
> On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:
>
> > 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
> >
> > DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> > required resources, failing slot requests. Acquired:
> > [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> > (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> > managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> > bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> > TMs: 1, registered slots: 1 free slots: 0
> > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Could not acquire the minimum required resources.
> >
> > 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> > 这是我doris sink的代码,flink doris connector版本是1.1.1
> > DorisSink.Builder builder = DorisSink.builder();
> > DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> > dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
> >
> > .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> > .setUsername(parameterTool.get("doris.user"))
> > .setPassword(parameterTool.get("doris.password"));
> >
> > Properties pro = new Properties();
> > pro.setProperty("format", "json");
> > pro.setProperty("read_json_by_line", "true");
> >
> > Date date = new Date();
> > DorisExecutionOptions.Builder executionBuilder =
> > DorisExecutionOptions.builder();
> >
> > executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
> >
> > String[] fields =
> > {"uid","subject","trade_date","update_time","value"};
> > DataType[] types =
> > {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
> >
> > builder.setDorisReadOptions(DorisReadOptions.builder().build())
> > .setDorisExecutionOptions(executionBuilder.build())
> >
> > .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> > .setDorisOptions(dorisBuilder.build());
> > fundCategoryDataStream.sinkTo(builder.build())
> >
> > .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> > "fund_category_sink"))
> >
> > .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
> >
> > .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> > .name("fundCategorySinkName”);
> >
> >
> >


Re: Flink 误报checkpoint失败

2023-05-03 文章 Yanfei Lei
hi, 扩缩容会重启作业,在作业重启期间,job manager 先启动了,还有部分task manager没启动就有可能报“Not all
required tasks are currently
running..”的错误,作业的所有task完全启动后这个错误就会消失。

Best,
Yanfei
Chen Yang  于2023年5月4日周四 09:44写道:
>
> 您好,
>
> 我的 Flink job是以 reactive 模式运行,然后用了 Kubernetes HPA 来自动扩容/缩容
> TaskManager。每当TaskManager
> 扩容/缩容的时候,Flink会在日志中报错:因为扩缩容之前的TaskManager没有在运行导致checkpoint失败,同时也有checkpoint失败的警报。
> 但实际上checkpoint 还能顺利进行, job也没有运行错误。 重启job后这个错误就会消失。想请教一下如何修复这个问题?
>
> 详细的日志如下
> 2022-12-13 05:08:22.339 [jobmanager-io-thread-1] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 40393 for job  (488170 bytes,
> checkpointDuration=2582 ms, finalizationTime=322 ms).
> 2022-12-13 05:08:28.083 [Checkpoint Timer] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointFailureManager  - Failed to
> trigger checkpoint for job  since
> Checkpoint triggering task Source: Custom Source -> Sink: Unnamed (1/79) of
> job  is not being executed at the moment.
> Aborting checkpoint. Failure reason: Not all required tasks are currently
> running..
> 2022-12-13 05:09:19.437 [Checkpoint Timer] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 40394 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1670908159435 for job
> .
> 2022-12-13 05:09:25.208 [jobmanager-io-thread-1] INFO
>  org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> flink-ingest-sps-nv-consumer/2022-11-15T01:10:30Z//chk-40394/_metadata
> with MPU ID
> _3vKXSVBMuBM7207EpGvCXOTRQskAiPPj88DSTTn55Uzuc_76dnubmTAPBovyWbKBKU8Wxqz6SuFBJ8cZnAOH_PkGEP36KJzMFYYPmT.xZvmLnM.YX1oJSHN3VP1TXpJECY8y80psYvRWvbt2e8CMeoa9JiOWiGYGRmqLGRdlQA-
> 2022-12-13 05:09:25.747 [jobmanager-io-thread-1] INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 40394 for job  (482850 bytes,
> checkpointDuration=5982 ms, finalizationTime=330 ms).
>
> Thanks,
> Chen


Re: Flink rocksDB疑似内存泄露,导致被Linux kernel killed

2023-04-23 文章 Yanfei Lei
Hi,
请问作业有配置ttl吗?
另外可以参考下是否与下面两个问题类似:
1. pin L0 index in memory : https://issues.apache.org/jira/browse/FLINK-31089
2. max open files:https://issues.apache.org/jira/browse/FLINK-31225

Biao Geng  于2023年4月23日周日 15:35写道:
>
> Hi,
> 可以配置下jemalloc来进行堆外内存泄漏的定位。
> 具体操作可以参考下这两篇文章。
> https://cloud.tencent.com/developer/article/1884177
> https://chenhm.com/post/2018-12-05-debuging-java-memory-leak#_native_method_%E5%86%85%E5%AD%98
>
> Best,
> Biao Geng
>
> Guo Thompson  于2023年4月22日周六 09:57写道:
>
> > yarn,我已经关闭了yarn的内存检查,glibc的那个参数已经配置成1了
> >
> > Weihua Hu  于2023年4月21日周五 19:23写道:
> >
> > > Hi,
> > >
> > > 你作业运行在 YARN 还是 Kubernetes 上?可以先关注下文档里的 Glibc 泄露问题
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Fri, Apr 21, 2023 at 6:04 PM Guo Thompson 
> > > wrote:
> > >
> > > > Flink
> > > >
> > >
> > Job是基于sql的,Flink版本为1.13.3,state用rocksDB存,发现会存在内存泄露的情况,作业运行一段时间后,会被linux内核kill掉,求助,如何解决?
> > > > 网上
> > > >
> > >
> > http://www.whitewood.me/2021/01/02/%E8%AF%A6%E8%A7%A3-Flink-%E5%AE%B9%E5%99%A8%E5%8C%96%E7%8E%AF%E5%A2%83%E4%B8%8B%E7%9A%84-OOM-Killed/
> > > > 讲很可能就是rocksDB的内存没法回收导致。
> > > >
> > > > 1、分配 tm的30G内存,jvm堆内的远远没有使用完。
> > > > [image: 8f47b109-a04b-4bc1-8f64-fed21c58838d.jpeg]
> > > > 2、从linux上查看内存使用,实际使用内存 44.4G,远远超出设置的30G
> > > > [image: image.png]
> > > > 3、dump下tm的jvm内存,实际不到2G(dump会触发full gc)
> > > > [image: image.png]
> > > >
> > >
> >



-- 
Best,
Yanfei


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 文章 Yanfei Lei
Congratulations!

Best Regards,
Yanfei

ramkrishna vasudevan  于2023年3月27日周一 21:46写道:
>
> Congratulations !!!
>
> On Mon, Mar 27, 2023 at 2:54 PM Yu Li  wrote:
>>
>> Dear Flinkers,
>>
>>
>> As you may have noticed, we are pleased to announce that Flink Table Store 
>> has joined the Apache Incubator as a separate project called Apache 
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>> streaming data lake platform for high-speed data ingestion, change data 
>> tracking and efficient real-time analytics, with the vision of supporting a 
>> larger ecosystem and establishing a vibrant and neutral open source 
>> community.
>>
>>
>> We would like to thank everyone for their great support and efforts for the 
>> Flink Table Store project, and warmly welcome everyone to join the 
>> development and activities of the new project. Apache Flink will continue to 
>> be one of the first-class citizens supported by Paimon, and we believe that 
>> the Flink and Paimon communities will maintain close cooperation.
>>
>>
>> 亲爱的Flinkers,
>>
>>
>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2] 
>> [3]。新项目的名字是 Apache 
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>
>>
>> 在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 
>> 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。
>>
>>
>> Best Regards,
>>
>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>
>>
>> 致礼,
>>
>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>
>>
>> [1] https://paimon.apache.org/
>>
>> [2] https://github.com/apache/incubator-paimon
>>
>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal


Re: Re: flink on yarn 异常停电问题咨询

2023-03-09 文章 Yanfei Lei
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。

> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。

> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
是观察到checkpoint dir下面没有文件吗?

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

guanyq  于2023年3月10日周五 08:58写道:
>
> 目前也想着用savepoint处理异常停电的问题
> 但是我这面还有个疑问:
> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> 就很奇怪是不是10个checkpoint都没落盘导致的。
> 想问下:
> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 08:47:11,"Shammon FY"  写道:
> >Hi
> >
> >我觉得Flink
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >
> >Best,
> >Shammon
> >
> >On Thu, Mar 9, 2023 at 10:06 PM guanyq  wrote:
> >
> >> 前提
> >> 1.flink配置了高可用
> >> 2.flink配置checkpoint数为10
> >> 3.yarn集群配置了任务恢复
> >> 疑问
> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >>
> >>
> >>
> >>



-- 
Best,
Yanfei


[ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

2023-01-30 文章 Yanfei Lei
It is very happy to announce the release of FRocksDB 6.20.3-ververica-2.0.

Compiled files for Linux x86, Linux arm, Linux ppc64le, MacOS x86,
MacOS arm, and Windows are included in FRocksDB 6.20.3-ververica-2.0
jar, and the FRocksDB in Flink 1.17 would be updated to
6.20.3-ververica-2.0.

Release highlights:
- [FLINK-30457] Add periodic_compaction_seconds option to RocksJava[1].
- [FLINK-30321] Upgrade ZLIB of FRocksDB to 1.2.13[2].
- Avoid expensive ToString() call when not in debug[3].
- [FLINK-24932] Support build FRocksDB Java on Apple silicon[4].

Maven artifacts for FRocksDB can be found at:
https://mvnrepository.com/artifact/com.ververica/frocksdbjni

We would like to thank all efforts from the Apache Flink community
that made this release possible!

[1] https://issues.apache.org/jira/browse/FLINK-30457
[2] https://issues.apache.org/jira/browse/FLINK-30321
[3] https://github.com/ververica/frocksdb/pull/55
[4] https://issues.apache.org/jira/browse/FLINK-24932

Best regards,
Yanfei
Ververica(Alibaba)


Re: 设置slot是vcore的几倍会有什么影响

2022-11-07 文章 Yanfei Lei
Hi junjie,
一个slot可以看作JVM中的一个线程[1],因此可以设置taskmanager.numberOfTaskSlots超过cpu core的数目。

> 这样设置slot是vcore的几倍会有什么影响吗?

设置slot是vcore的几倍可能导致资源bound(如cpu、内存、磁盘、网络带宽等),我曾经遇到过slot数目过多(每个slot上的subtask的状态较大)引起的磁盘不足问题。
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources

Best,
Yanfei

junjie.m...@goupwith.com  于2022年11月7日周一 19:24写道:

> hi,
> 问个问题就是假设我只有一台服务器且只有8vcore,standalone启动时可以设置taskmanager.numberOfTaskSlots >
> 8 吗?
> 我尝试了设置为32等都是可以的且提交job完全占满slot数也是没问题的,这样设置slot是vcore的几倍会有什么影响吗?
> 谢谢!!!
>


-- 
Best,
Yanfei


Re: Re: flink1.15.1 stop 任务失败

2022-10-14 文章 yanfei lei
Hi yidan && hjw,
我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink
1.15后被deprecated了[1],推荐用新的KafkaSource再试试。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction

Best,
Yanfei

hjw <1010445...@qq.com.invalid> 于2022年8月23日周二 23:39写道:

> 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira
> https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hinobl...@gmail.com;
> 发送时间:2022年8月23日(星期二) 晚上11:09
> 收件人:"user-zh"
> 主题:Re: Re: flink1.15.1 stop 任务失败
>
>
>
> 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> 2 也可以从 cancel 和 stop 的区别上考虑下?
> 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
>
> yidan zhao  
>  看了下,报错很少。
>  反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
>  ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。
>  目前4台机器:
>  机器1
>  2022-08-23 22:47:37,093 WARN
> 
> org.apache.flink.runtime.taskmanager.Task
> [] -
>  Source: JobConfig - Split(JobName_configType)
>  (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from
> RUNNING to
>  FAILED with failure cause:
>  org.apache.flink.util.FlinkRuntimeException: S
>  top-with-savepoint failed.
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
>  Executor.java:93)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
>  8)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>  at
> java.lang.Thread.run(Thread.java:748)
>  下面就是各种 free task,unregister扒拉的。
> 
>  机器2
>  ...
>  基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。
> 
>  Xuyang   
>   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>  
>  
>  
>  
>  
>  
>  
>   --
>  
>   Best!
>   Xuyang
>  
>  
>  
>  
>  
>   Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
>   在 2022-08-23 20:41:59,"yidan zhao"补充部分信息:
>   看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
>   2022-08-23 20:33:22,307 INFO
>  
> org.apache.flink.runtime.jobmaster.JobMaster
> [] -
>   Triggering savepoint for job 8d231de75b8227a1b
>   715b1aa665caa91.
>   
>   2022-08-23 20:33:22,318 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Triggering checkpoint 5 (type=SavepointType{na
>   me='Savepoint', postCheckpointAction=NONE,
> formatType=CANONICAL}) @
>   1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:33:23,701 INFO
>  
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
>   [] - Cannot create recoverable writer
>due to Recoverable writers on Hadoop are only supported for
> HDFS,
>   will use the ordinary writer.
>   
>   2022-08-23 20:33:23,908 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Completed checkpoint 5 for job
> 8d231de75b8227a1b715b1aa665caa91
>   (1638207 bytes, checkpointDuration=1600 ms,
> finalizationTime=1 ms).
>   
>   
>   如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
>   
>   2022-08-23 20:35:01,834 INFO
>  
> org.apache.flink.runtime.jobmaster.JobMaster
> [] -
>   Triggering stop-with-savepoint for job
>   8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:35:01,842 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Triggering checkpoint 6 (type=SavepointType{name='Suspend
> Savepoint',
>   postCheckpointAction=SUSPEND, formatType=CANONICAL}) @
> 1661258101834
>   for job 8d231de75b8227a1b715b1aa665caa91.
>   
>   2022-08-23 20:35:02,083 INFO
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> [] -
>   Decline checkpoint 6 by task
> a65383dad01bc15f654c4afe4aa63b6d of job
>   8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5
> @
>   xxx.xxx.com (dataPort=13156).
>   (此处看起来是被decline了,原因是 task 

Re: Re: OutOfMemoryError: Direct buffer memory

2022-10-11 文章 yanfei lei
Hi,
*- 每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?*

在每次执行作业后,有主动关闭查询结果和被打开的资源吗?或者可以参考这个[1]排查一下作业本身是否有内存泄漏。

*- 执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。*

您的sql作业是通过sql client执行的吗?sql
client可以同时提交多个异步的作业,可以去session集群的WebUI上看看正在运行的作业,确定一下前面的任务是否关闭。

> By default, SQL Client executes DML statements asynchronously. That means,
> SQL Client will submit a job for the DML statement to a Flink cluster, and
> not wait for the job to finish. So SQL Client can submit multiple jobs at
> the same time


*- Flink taskmanager的off-heap内存管理有更多的介绍吗? *
看看这篇是否有帮助 “Off-heap Memory in Apache Flink and the curious JIT compiler” [3]

[1]
https://nodejh.com/posts/flink-%E4%BB%BB%E5%8A%A1%E5%86%85%E5%AD%98%E6%B3%84%E6%BC%8F%E5%AF%BC%E8%87%B4%E9%A2%91%E7%B9%81-full-fc-%E5%AF%BC%E8%87%B4-cpu-%E6%9A%B4%E5%A2%9E%E9%97%AE%E9%A2%98%E6%8E%92%E6%9F%A5/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-dml-statements-syncasync
[3]
https://flink.apache.org/news/2015/09/16/off-heap-memory.html?spm=a2c6h.12873639.article-detail.7.506a2e399nTOH6#appendix-detailed-micro-benchmarks

RS  于2022年10月11日周二 09:26写道:

> Hi,
> 调大 taskmanager.memory.task.off-heap.size 应该能解决部分问题,
>
> 我这里还有些疑问,部署的session集群,每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?
> 执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。Flink
> taskmanager的off-heap内存管理有更多的介绍吗?(官网的看过了)
> Thanks
>
> 在 2022-10-10 12:34:55,"yanfei lei"  写道:
> >从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。
> >
> >Best,
> >Yanfei
> >
> >allanqinjy  于2022年10月8日周六 21:19写道:
> >
> >>
> >>
> 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。
> >>
> >>
> >> | |
> >> allanqinjy
> >> |
> >> |
> >> allanqi...@163.com
> >> |
> >> 签名由网易邮箱大师定制
> >>
> >>
> >> On 10/8/2022 21:00,RS wrote:
> >> Hi,
> >>
> >>
> >> 版本:Flink-1.15.1
> >>
> >>
> >> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
> >> SQL定义执行,source是connector=filesystem,format=raw,path=
> >>
> >>
> >> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
> >>
> >>
> >> 集群的off-heap都是默认配置,
> >> taskmanager.memory.task.off-heap.size=0
> >> taskmanager.memory.framework.off-heap.size=128MB
> >>
> >>
> >> 报错堆栈:
> >> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> >> out-of-memory error has occurred. This can mean two things: either
> job(s)
> >> require(s) a larger size of JVM direct memory or there is a direct
> memory
> >> leak. The direct memory can be allocated by user code or some of its
> >> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> >> configuration option should be increased. Flink framework and its
> >> dependencies also consume the direct memory, mostly for network
> >> communication. The most of network memory is managed by Flink and should
> >> not result in out-of-memory error. In certain special cases, in
> particular
> >> for jobs with high parallelism, the framework may require more direct
> >> memory which is not managed by Flink. In this case
> >> 'taskmanager.memory.framework.off-heap.size' configuration option
> should be
> >> increased. If the error persists then there is probably a direct memory
> >> leak in user code or some of its dependencies which has to be
> investigated
> >> and fixed. The task executor has to be shutdown...
> >> at java.nio.Bits.reserveMemory(Bits.java:695)
> >> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> >> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> >> at
> >>
> org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
> >> at
> >>
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
> >> at
> >>
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
> >> at
> >>
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
> >> at
> >>
> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
> >> at
> >>
> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
> >> at
> >>
> org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
&

Re: Re: table store 和connector-kafka包冲突吗?

2022-10-09 文章 yanfei lei
Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
把flink-connector-kafka-1.15.1.jar 去掉再试试?


RS  于2022年10月8日周六 17:19写道:

> Hi,
> 报错如下:
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Multiple factories for
> identifier 'kafka' that implement
> 'org.apache.flink.table.factories.DynamicTableFactory' found in the
> classpath.
>
>
> Ambiguous factory classes are:
>
>
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
> org.apache.flink.table.store.kafka.KafkaLogStoreFactory
>
> org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>
>
> Thanks
>
>
>
>
>
> 在 2022-10-08 13:38:20,"Shammon FY"  写道:
> >Hi RS
> >你这边能提供一下具体的冲突错误栈吗?
> >
> >On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
> >
> >> Hi,
> >>
> >>
> >> 版本:flink-1.15.1
> >> 使用table
> >>
> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
> >>
> >>
> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
> >>
> >>
> >> Thanks
>


Re: OutOfMemoryError: Direct buffer memory

2022-10-09 文章 yanfei lei
从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。

Best,
Yanfei

allanqinjy  于2022年10月8日周六 21:19写道:

>
> 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> On 10/8/2022 21:00,RS wrote:
> Hi,
>
>
> 版本:Flink-1.15.1
>
>
> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
> SQL定义执行,source是connector=filesystem,format=raw,path=
>
>
> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
>
>
> 集群的off-heap都是默认配置,
> taskmanager.memory.task.off-heap.size=0
> taskmanager.memory.framework.off-heap.size=128MB
>
>
> 报错堆栈:
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies which has to be investigated
> and fixed. The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:695)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at
> org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
> at
> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
> at
> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
> at
> org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
> at
> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at 

Re: flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-21 文章 yanfei lei
Hi,
Flink社区有一篇关于Credit-based Flow Control的blog post

,里面介绍了反压机制的原理和优劣势,希望有帮助。

Shammon FY  于2022年9月21日周三 11:43写道:

> Hi
> 我个人觉得简单的说flink数据传输是pull模型可能会有歧义,一般来讲大家理解的两个模型的执行流程如下
> 1. push模型
> 上下游计算任务将初始化网络连接后,上游计算任务直接通过连接不断向下游"push"数据
> 2. pull模型
> 上下游计算任务初始化网络连接后,下游计算任务根据自己的计算进度,轮询向上游发送请求“pull”数据,执行下一轮计算
>
> 在flink里,上下游交互流程主要分为几个步骤
> 1. 上游计算任务所在的TM创建一个Netty Server
> 2. 下游计算任务启动时通过Netty Client跟上游创建连接
> 3. 下游计算任务向上游发送一个partition
> request请求,上游根据request请求创建数据reader,通过reader不断读取数据并通过连接发送数据
> 4. 上下游计算任务分别有自己的内存池子,用于流控,大概流程如下
> a) 下游计算任务根据数据消费内存池子情况,不定期向上游计算任务更新授信(credit)
> b) 上游计算任务根据接收到的credit消息,更新本地管理的授信大小
> c) 上游计算任务根据本地授信大小不断向下游计算任务发送数据
>
> 通过这种方式,在资源足够的情况下,可以保证数据传输是完全流式的,这跟传统的pull模型不同,可能更像是支持授信流控机制的push模型
>
> On Wed, Sep 21, 2022 at 9:43 AM yh z  wrote:
>
> > 你好。 Flink 采用的是 pull 模型。pull 模型的优点在于:1.
> > 其具有更好的扩展性(下游的消费者可以根据需求增加,只需要获取到上游的消费位点); 2. 下游的消费者可以根据需求来调整消费速率;
> > 3.网络传输,flink 以前也尝试使用过push模型,且为了节约开销,进程间是复用 TCP连接,一个 task
> 线程的性能瓶颈将导致整条链路的所有
> > task 线程不能接收数据,影响整体的数据消费速率。 push模型的优点:消耗较小,不需要设计机制来一直轮训观察上游节点的数据情况。
> >
> > Xuyang  于2022年9月9日周五 20:35写道:
> >
> > > Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best!
> > > Xuyang
> > >
> > >
> > >
> > >
> > >
> > > 在 2022-09-09 19:04:27,"郑 致远"  写道:
> > > >各位大佬好
> > > >请教下,
> > > >flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?
> > >
> >
>


Re: native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 yanfei lei
Hi Tino,
从org.apache.flink.core.fs.FileSystem.java

来看,Flink直接将fs.default-scheme当作URI来解析,并没有解析相关xml配置的操作,看起来Flink目前是不支持HA架构的HDFS集群的。

Best,
Yanfei

Xuyang  于2022年9月21日周三 23:28写道:

> Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: datacluster
> 我猜测是不是可以修改为以下两种之一:
> 1. hdfs://datacluster: port (类似hdfs://datacluster:8080)
>
> 2.  hdfs:///datacluster (三个斜杠)
>
>
>
>
> 希望可以帮到你
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-21 18:24:46,"Tino Hean"  写道:
> >*大家好, *
> >*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
> >./bin/flink run-application \
> >--detached \
> >--target kubernetes-application \
> >-Dkubernetes.cluster-id=test \
> >-Dkubernetes.container.image=flink-java11 \
> >-Dfs.default-scheme=hdfs://datacluster \
> >-Dkubernetes.rest-service.exposed.type=LoadBalancer \
> >
>
> >-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> >\
> >-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
> >-Dkubernetes.namespace=flink \
> >-Dkubernetes.service-account=flink-sa \
> >-Denv.hadoop.conf.dir=/opt/flink/conf \
> >-Dkubernetes.container.image.pull-policy=Always \
> >local:///opt/flink/usrlib/test.jar
> >
> >*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
> >flink@e3187a41a139:~$ ls conf
> >core-site.xml hdfs-site.xml log4j-console.properties
> >log4j-session.properties logback-session.xml masters zoo.cfg
> >flink-conf.yaml log4j-cli.properties log4j.properties logback-console.xml
> >logback.xml workers
> >
> >*但是遇到了下面的报错:*
> >
> >2022-09-21 10:17:40,156 ERROR
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could
> not
> >start cluster entrypoint KubernetesApplicationClusterEntrypoint.
> >org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> >initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
> >[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
> >[flink-dist-1.15.2.jar:1.15.2]
> >Caused by: org.apache.flink.util.FlinkException: Could not create the ha
> >services from the instantiated HighAvailabilityServicesFactory
> >org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
> >at
>
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at java.security.AccessController.doPrivileged(Native Method)
> ~[?:?]
> >at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
> >at
>
> >org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>
> >~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
> >at
>
> >org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >... 2 more
> >Caused by: java.io.IOException: Could not create FileSystem for highly
> >available storage path (hdfs://datacluster/flink/recovery/cruiser)
> >at
>
> >org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> >org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
> >~[flink-dist-1.15.2.jar:1.15.2]
> >at
>
> 

Re: 关于flink的state

2022-08-30 文章 yanfei lei
Hi,
1) state无法在不同的算子共享,如yue ma的建议,或许可以把需要共享的部分存储在外部系统,然后在两个map里访问同一个外部系统以实现共享
2) 除开operatorState,或许自定义一个总是返回相同值的keySelector,也可以把所有的key都聚合到一起。

yue ma  于2022年8月30日周二 14:20写道:

> hi
> 1) flink 内部的 state 算子之间是不可以共享的,所以你可能需要借助外部的存储(比如 redis)来做类似的事情
> 2) 你可以看看 operatorState 的使用方式
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#operator-state
>
>
> 曲洋  于2022年8月30日周二 12:32写道:
>
> > 各位好,
> >我想请教两个问题:
> > 1) flink的state是否可以在不同的算子共享,比如,第一个map我有一个state,到了第二个map我继续拿到这个state?
> > 2) flink的state有没有不需要keyby的,因为想统计一个总量,还没有合适的key可以选择?
> >
> >
>


Re: get state from window

2022-08-17 文章 yanfei lei
Hi,  there are two methods on the Context object that a process() invocation
receives that allows access to the two types of state:

   - globalState(), which allows access to keyed state that is not scoped
   to a window
   - windowState(), which allows access to keyed state that is also scoped
   to the window

maybe you can refer to the implementation of WindowReaderTest

.

Best,
Yanfei

2022年8月18日 上午10:05,曲洋  写道:

Hi dear engineers,

I have one question:  does flink streaming support getting the state.I
override the open method in the map processor,initializing the state(some
metric for the data) .How can I get the state in my window?

Thanks for your help!


Re: flink中文邮件列表不显示其他用户提问的flink问题。

2022-07-13 文章 yanfei lei
hi, 列表是指您的收件箱列表吗?您可以通过
https://lists.apache.org/list.html?user-zh@flink.apache.org 查看其他用户的问题和答案。

Best,
Yanfei

张锴  于2022年7月14日周四 09:17写道:

> 我重新订阅了flink中文邮件,但是列表里没有显示其他用户提问或者解答有关flink相关的问题和答案,这是什么原因?
>


Re: Flink状态过期时是否可以将其输出到日志中

2022-07-08 文章 yanfei lei
Hi, 
Flink暂时不支持过期清理时的回调函数。如果用得是cleanupIncrementally策略(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state),可以自行在`TtlIncrementalCleanup`类中添加相应的log。

> 2022年6月27日 下午2:09,haishui  写道:
> 
> Hi,
> Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。
> 
> Best Regards!