flink问题咨询

2023-03-08 文章 陈隽尧
您好,

 我是flink一名新用户,最近在项目中需要用到flink完成一项业务功能,但目前遇到了一些一些困难,想咨询你一下是否有合适的解决方案,期待您的回信

 

 
问题背景:我们需要基于股票交易流水和股票行情去计算股票账户层面的一些指标(为简化场景,假定账户指标只有持仓量,买入均价,市值),页面前端20s刷新一次,指标计算想基于flink的dataStream
 Api实现,但遇到一个问题,目前初步想法如下,请flink大神帮忙指导 

 

初步方案设想:假定stream1: 股票交易流水, stream2:股票行情流水  
stream1.keyBy().connect(stream2.keyBy()).process(), key为股票代码,在processFunction里面 

 

Ø  open方法:加载日初预算的指标值到一个ListState中,listState里面对象包含四个字段: 账户,持仓量,买入均价,市值 (均为日初的值)

 

Ø  processElement1:基于每笔股票交易流去计算仅受交易流水影响的指标(如持仓量和买入均价),更新ListState, 

 

Ø  processElement2:  
只把行情作为状态缓存(MapState,key为股票代码),保留每个标的最新的行情(由于系统只20s更新一次数据,行情的推送频率相对较高大约3s一次,没必要每来一次就算一次)
 

 

Ø  设置一个定义器20s执行一次,在onTime() 
去基于上面的ListState和当前最新行情的MapState计算市值然后更新去ListState中市值数据(市值的计算逻辑是最新持仓*行情最新价格,onTime里面只会发送20s内有变化的ListState到下一个算子继续处理(下个算子会做汇聚计算)
 

 

 

  
存在问题:就是我怎么知道每次onTime触发的20s内,哪些ListState发生了变化?因为processElement方法和onTimer方法我理解时在两个线程里面分别处理的,如果在processElement方法中通过给listState里面每个数据加修改状态,在onTimer获取标记的ListState然后要清除状态,但要保证正确必须做线程同步,感觉flink里面
  做线程同步是不是不太合适

 

²  
不用stream1.coGroup(stream2).where().equalTo().window(TumblingEventTimeWindows.of(Time.hours(1))).apply()的原因,是因为coGroupFunction只会在窗口关闭时触发计算,但是交易流可以来一条数据处理一次,不想做成这种微批处理

²  使用定时器不在processElement2的方法中计算的原因是因为只需要20s更新一次,行情更新频率较快,没必要浪费算力行情数据每次来一次算一次


保密备注:   
本邮件及其附件含有华泰证券股份有限公司及/或其子公司的保密信息,仅限于发送给上面地址中列出的个人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!声明:
   
本邮件提供的信息或观点不构成购买或出售所提及的投资产品的投资建议、要约或招揽。与您的特定投资目标、财务状况和特定需求无关。因此,不承担由此信息直接或间接导致损失的责任。
   计算机病毒可以通过电子邮件传播。 
接收方应在接收电子邮件或任何附件时检查有无病毒。本公司对由于本电子邮件引发病毒所产生的任何损失不承担任何责任。电子邮件传输过程中不能确保安全和准确,信息可能被拦截、篡改、丢失、损坏,也可能延迟送达、不完整或包含病毒,因此本公司对电子邮件传输过程中所产生的任何内容错误或缺失不承担任何责任。Confidentiality
 Note:  This e-mail and its attachments contain confidential 
information from Huatai Securities Co., Ltd. and/or its subsidiaries, which is 
intended only for the person or entity whose address is listed above. Any use 
of the information contained herein in any way (including, but not limited to, 
total or partial disclosure, reproduction, or dissemination) by persons other 
than the intended recipient(s) is prohibited. If you receive this e-mail in 
error, please notify the sender by phone or email immediately and delete 
it.Disclaimer: The information or opinions provided in this email do 
not constitute an investment advice, an offer or solicitation to subscribe for, 
purchase or sell the investment product(s) mentioned herein. It does not have 
any regard to your specific investment objectives, financial situation and any 
of your particular needs. Accordingly, no warranty whatsoever is given and no 
liability whatsoever is accepted for any loss arising whether directly or 
indirectly as a result of this information.Computer viruses can be 
transmitted via email. The recipient should check this email and any 
attachments for the presence of viruses. The company accepts no liability for 
any damage caused by any virus transmitted by this email. E-mail transmission 
cannot be guaranteed to be secure or error-free as information could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain 
viruses. The sender therefore does not accept liability for any errors or 
omissions in the contents of this message, which arise as a result of e-mail 
transmission.



Re: Re:Re: Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-08 文章 Jane Chan
从 plan 上看起来在 sink 节点这里因为推导不出 upsert key 加上了 SinkUpsertMaterializer[1],
这里会按照 sink 表定义的主键进行 keyby shuffle[2], 只能保证最终一致性.
另外你的操作描述中 schema 为三列, 但 DDL 是四列, 且格式乱了.

一些可能的建议如下

1. 如果上游数据有主键并且也是 rowid 的话, 建议在 Flink source 表上声明 PK, 避免额外生成 materializer
节点; 同时注意在声明 Flink source 表时不要带上 metadata 列 (比如 op), 这会导致非确定性更新[3].
2. 检查写入 MySQL 数据库中的物理表 PK 字段是否和 Flink SQL sink 表的 PK 字段保持一致.

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
[2]
https://github.com/apache/flink/blob/3ea83baad0c8413f8e1f4a027866335d13789538/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L378
[3]
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/determinism/#31-%e6%b5%81%e4%b8%8a%e7%9a%84%e4%b8%8d%e7%a1%ae%e5%ae%9a%e6%80%a7

Best,
Jane

On Mon, Mar 6, 2023 at 11:24 AM 陈佳豪  wrote:

> 刚做了一下测试
> 目前假定有3行数据需要同步(全量):
> | 编号 |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 1311313
> |
> 789
> |
>
>
>
>
> 这个时候我修改第四行数据的两个字段(增量):
> | 1
>
>
> |
> 电话
> |
> 座机
> |
> | 1 |
> 1311313
> |
> 123
> |
> | 2 |
> 1311313
> |
> 456
> |
> | 3 |
> 13113133110
> |
> 888
> |
> 修改完后我删除字段2这个时候去mysql看结果2是正确被删除,且无新增的(操作正确).
> 然后我继续删除数据3这个时候就不对了,在flink里面有修改两次的缓存数据,所以删除的同时将原来的旧数据插入进了mysql中(操作错误).
>
> 上述是我基于flink1.16.1版本进行测试的结果,目前不知道是不是要配置flink还是下游算子具体配置什么也不是清楚。这个问题困扰有3周了,各种测试调整都没有起作用。
>
>
>
>
>
>
>
>
> 在 2023-03-06 10:54:23,"陈佳豪"  写道:
> >hi 早上好
>
> >我将flink升级到了1.16.1的版本去执行kafka同步到mysql的任务,发现还是存在一样的问题,我本机执行了explain的执行过程给的输出如下
> >
> >== Abstract Syntax Tree ==
> >LogicalSink(table=[default_catalog.default_database.电话_1], fields=[rowID,
> 名称, 手机, 座机])
> >+- LogicalProject(rowID=[CAST($0):VARCHAR(255) CHARACTER SET "UTF-16LE"],
> 名称=[$1], 手机=[CAST($2):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
> 座机=[CAST($3):VARCHAR(255) CHARACTER SET "UTF-16LE"])
> >   +- LogicalTableScan(table=[[default_catalog, default_database, 电话]])
> >
> >
> >== Optimized Physical Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> rowID, 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS 手机,
> CAST(63fd660536521f81a2cfabae AS VARCHAR(255) CHARACTER SET "UTF-16LE") AS
> 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >== Optimized Execution Plan ==
> >Sink(table=[default_catalog.default_database.电话_1], fields=[rowID, 名称,
> 手机, 座机], upsertMaterialize=[true])
> >+- Calc(select=[CAST(rowid AS VARCHAR(255)) AS rowID,
> 63fd65fb36521f81a2cfab90 AS 名称, CAST(63fd660536521f81a2cfabad AS
> VARCHAR(2147483647)) AS 手机, CAST(63fd660536521f81a2cfabae AS VARCHAR(255))
> AS 座机])
> >   +- TableSourceScan(table=[[default_catalog, default_database, 电话]],
> fields=[rowid, 63fd65fb36521f81a2cfab90, 63fd660536521f81a2cfabad,
> 63fd660536521f81a2cfabae])
> >
> >
> >
> >在 2023-03-05 15:37:53,"Jane Chan"  写道:
> >>Hi,
> >>
> >>抱歉, 这里 typo 了, 应该是 1.16.1. 我在 1.16.1 上验证了你之前发的 query, 是可以正常删除的. 可以在
> 1.16.1
> >>上尝试下, 也可以试试在 1.15.2 上使用 EXPLAIN CHANGELOG_MODE INSERT INTO...[1] 将 plan
> >>打印出来看看.
> >>
> >>[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>
> >>祝好!
> >>Jane
> >>
> >>On Sun, Mar 5, 2023 at 2:36 PM 陈佳豪  wrote:
> >>
> >>> hi 你好
> >>> 目前没有1.16.2版本的吧? 我看flink官网都是1.16.0 或者是1.16.1的
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> 在 2023-03-02 11:52:41,"Jane Chan"  写道:
> >>> >Hi,
> >>> >
> >>> >可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本,
> 这个
> >>> >query 在 1.16.2 上验证没有问题
> >>> >
> >>> >[1]
> >>> >
> >>>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
> >>> >
> >>> >Best,
> >>> >Jane
> >>> >
> >>> >On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪  wrote:
> >>> >
> >>> >> flink ,kafka连接 jdbc连接版本都是1.15.2的
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> 在 2023-03-01 18:14:35,"陈佳豪"  写道:
> >>> >> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >>> >> >String kafka = "CREATE TABLE `电话` (`rowid`
> >>> >> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> >>> >> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> >>> >> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> >>> >> 'connector' = 'kafka', 'topic' =
> >>> >> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> >>> >> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> >>> >> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json'
> )";

Re: Flink异步Hbase导致Too many open files异常

2023-03-08 文章 Ran Tao
+1 有遇到过类似 fd 泄露的问题。注意 close 的时候buffer 数据刷盘, 然后资源关闭,future cancel。

Best Regards,
Ran Tao


Weihua Hu  于2023年3月8日周三 16:52写道:

> Hi,
>
> 通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。
>
> 在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。
>
> Best,
> Weihua
>
>
> On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote:
>
> > Hi
> >   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> > 2023-03-08 16:15:39
> > org.jboss.netty.channel.ChannelException: Failed to create a selector.
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:100)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:52)
> > at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39)
> > at
> org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> > at org.hbase.async.HBaseClient.(HBaseClient.java:507)
> > at org.hbase.async.HBaseClient.(HBaseClient.java:496)
> > at
> >
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> > at
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.IOException: Too many open files
> > at sun.nio.ch.IOUtil.makePipe(Native Method)
> > at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> > at sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> >
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> > at
> >
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> > ... 25 more
> >   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> > java.io.IOException: Could not perform checkpoint 5 for operator async
> > wait operator (2/9)#0.
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> > at
> >
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> > at
> >
> 

Re: Flink异步Hbase导致Too many open files异常

2023-03-08 文章 Weihua Hu
Hi,

通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。

在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。

Best,
Weihua


On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote:

> Hi
>   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> 2023-03-08 16:15:39
> org.jboss.netty.channel.ChannelException: Failed to create a selector.
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.(AbstractNioSelector.java:100)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.(AbstractNioWorker.java:52)
> at org.jboss.netty.channel.socket.nio.NioWorker.(NioWorker.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.(NioWorkerPool.java:39)
> at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> at org.hbase.async.HBaseClient.(HBaseClient.java:507)
> at org.hbase.async.HBaseClient.(HBaseClient.java:496)
> at
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
> at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> ... 25 more
>   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> java.io.IOException: Could not perform checkpoint 5 for operator async
> wait operator (2/9)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
>