Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 Thread Zhefu PENG
Hi all,

这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。

问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。

反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。

Best,
Zhefu

LakeShen  于2020年6月12日周五 上午9:49写道:

> Hi ZheFu,
>
> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> 的数据是否都已经 Sink 到了 kafka.
>
> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>
> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>
> Best,
> LakeShen
>
> Congxian Qiu  于2020年6月11日周四 上午9:50写道:
>
> > Hi
> >
> > 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> > java.lang.IllegalStateException: Pending record count must be zero at
> this
> > point: 5”,需要看一下为什么会走到这里
> >
> > Best,
> > Congxian
> >
> >
> > 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
> >
> > >
> > >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> > >
> > > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > > >
> > > > 补充一下,在TaskManager发现了如下错误日志:
> > > >
> > > > 2020-06-10 12:44:40,688 ERROR
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > > > during disposal of stream operator.
> > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed
> > > to
> > > > send data to Kafka: Pending record count must be zero at this point:
> 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.IllegalStateException: Pending record count must
> > be
> > > > zero at this point: 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > > ... 8 more
> > > >
> > > > 希望得到帮助,感谢!
> > > >
> > > >
> > > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > > Field_Filter
> > > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> > > >>
> > > >>
> > > >>
> > >
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > > >>
> > > >> 部分报错信息如下:
> > > >> 2020-06-10 12:02:49,083 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Triggering
> > > >> checkpoint 1 @ 1591761769060 for job
> c41f4811262db1c4c270b136571c8201.
> > > >> 2020-06-10 12:04:47,898 INFO
> > > >> org.apache.flink.runtime.checkpoint

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
Hi Yichao,

感谢你的回复。因为这个任务已经上线大概一周了,今天才报出这个问题,我们后面会增大间隔并测试。同时,我在刚刚也有回复,我在TM也查到了一些相关日志:
2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

是不是还是和checkpoint的设立间隔过短有关呢?希望回复,感谢!

Best,
Zhefu

Yichao Yang <1048262...@qq.com> 于2020年6月10日周三 下午1:24写道:

> Hi
>
>
>
> 看报错是checkpoint失败次数超过了最大限制导致任务失败。checkpoint间隔设置太小了,在我们团队通常都是分钟级别的interval,我们一般设置5分钟,checkpoint只是一个容错机制,没有特殊的需求场景不需要设置间隔那么短,并且频繁checkpoint会导致性能问题。
>
>
> Best,
> Yichao Yang
>
>
> -- 原始邮件 --
> 发件人: Zhefu PENG  发送时间: 2020年6月10日 13:04
> 收件人: user-zh  主题: 回复:flink任务checkpoint无法完成snapshot,且报kafka异常
>
>
>
> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source - Map - Source_Map - Empty_Filer -
> Field_Filter
> - Type_Filter - Value_Filter - Map - Map - Map -
> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
> snapshot 1 for operator Source: Custom Source - Map - Source_Map
> -
> Empty_Filer - Field_Filter - Type_Filter - Value_Filter -
> Map - Map -
> Map - Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
补充一下,在TaskManager发现了如下错误日志:

2020-06-10 12:44:40,688 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
during disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Pending record count must be zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be
zero at this point: 5
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more

希望得到帮助,感谢!


Zhefu PENG  于2020年6月10日周三 下午1:03写道:

> Hi all,
>
> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>
>
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>
> 部分报错信息如下:
> 2020-06-10 12:02:49,083 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
> 2020-06-10 12:04:47,898 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> c41f4811262db1c4c270b136571c8201 at
> container_e27_1591466310139_21670_01_06 @
> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> 2020-06-10 12:04:47,899 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map
> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map
> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtim

flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-09 Thread Zhefu PENG
Hi all,

现在有一个简单的flink任务,大概chain在一起后的执行图为:
Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
-> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed

但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。

部分报错信息如下:
2020-06-10 12:02:49,083 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
2020-06-10 12:04:47,898 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
c41f4811262db1c4c270b136571c8201 at
container_e27_1591466310139_21670_01_06 @
hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
2020-06-10 12:04:47,899 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 1 for operator Source: Custom Source -> Map -> Source_Map ->
Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map ->
Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: The server disconnected before a response was
received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:973)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:892)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:978)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotStat(AbstractStreamOperator.java:402)
... 18 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
2020-06-10 12:04:47,913 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87)
at

Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-07 Thread Zhefu PENG
Hi Jingsong,

如你所说,[1]就是一种增量更新方差的方式,这个也是我之前查到的一种方式,并打算在add内使用;但是我的问题是,在merge的时候,怎么让已经在之前算过的两批数据结果的准确性呢?比如,一个节点算过1,2,3的数据,另一个节点算过1,3,4的数据,ACC会保存总和,个数,方差值;但是在merge时候,借用ACC里留存的数据,使用[1]的增量更新方式,
就会造成结果的偏差啊(原因是因为数据需要去重,merge以后实现的其实是1,2,3,4的数据的方差计算,而不是1,2,3,1,3,4的计算)。这个怎么解决呢?

关于使用built-in的variance功能,我会去看一下,感谢你的帮助

Best,
Zhefu

Jingsong Li  于2020年5月7日周四 上午9:51写道:

> Hi,
>
> 重新计算一遍当然是正确的。
>
> 一个方式是参考Hive[1], Agg buffer需要保存count,sum,variance.
>
> 另一个方式是考虑分离distinct和variance,你试过直接用flink内置函数吗?比如variance(distinct item)?
> 当内置函数为variance时,会做一些特殊的plan改写优化。
>
> [1]
>
> https://github.com/apache/hive/blob/0966a383d48348c36c270ddbcba2b4516c6f3a24/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFVariance.java
>
> Best,
> Jingsong Lee
>
> On Thu, May 7, 2020 at 9:28 AM Zhefu PENG  wrote:
>
> > Hi Benchao & Jingsong,
> >
> > 谢谢你们的回复。的确使用sliding time window也是需要实现merge的。
> >
> >
> 这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗
> >
> > Thanks,
> > Zhefu
> >
> > On Wed, May 6, 2020 at 22:00 Jingsong Li  wrote:
> >
> > > Hi,
> > >
> > > 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。
> > >
> > > merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。
> > >
> > > 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Wed, May 6, 2020 at 9:22 PM Benchao Li  wrote:
> > >
> > > > Hi,
> > > >
> > > > 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> > > > 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> > > > window的pane都merge起来。
> > > >
> > > > Zhefu PENG  于2020年5月6日周三 下午9:05写道:
> > > >
> > > > > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > > > > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> > > > >
> > > > > On Wed, May 6, 2020 at 20:49 1193216154 <1193216...@qq.com> wrote:
> > > > >
> > > > > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > > > > >
> > > > > >
> > > > > >
> > > > > > ---原始邮件---
> > > > > > 发件人: "Zhefu PENG" > > > > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > > > > 收件人: "user-zh" > > > > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > > > > >
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > > > > >
> > > > > > 代码(简单伪代码)如下:
> > > > > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > > > > 第二项是计算的方差结果,输出的结果是
> > > > > > class MergeAggregateExample extends AggregateFunction[Double,
> (Set,
> > > > > > Double), Double] {
> > > > > >  override def createAccumulator(): (Set,
> Double)
> > =
> > > {
> > > > > >  //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > > > >  (Set(), 0.0)
> > > > > >  }
> > > > > >
> > > > > >  override def add(item: Double, accumulator:
> > (Set,
> > > > > > Double)): (Set,
> > > > > > Double) = {
> > > > > >  //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > > > > 并且增量更新方差;如果没出现过,直接跳过
> > > > > >  new Tuple2(Set, Double)
> > > > > >  }
> > > > > >
> > > > > >
> > > > > >  override def getResult(accumulator: (Set,
> > > Double)):
> > > > > > Double = {
> > > > > >  // 返回计算的方差结果
> > > > > >  accumulator._2
> > > > > >  }
> > > > > >
> > > > > >  override def merge(a: (Set, Double), b: (Set,
> > > > > Double)):
> > > > > > (Set, Double) =
> > > > > > {
> > > > > >  //
> > 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > > > >  }
> > > > > >
> > > > > >
> > > > > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > > > > >
> > > > > > Looking forward to your reply and help.
> > > > > >
> > > > > > Best,
> > > > > > Zhefu
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Benchao Li
> > > > School of Electronics Engineering and Computer Science, Peking
> > University
> > > > Tel:+86-15650713730
> > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
Hi Benchao & Jingsong,

谢谢你们的回复。的确使用sliding time window也是需要实现merge的。
这里有个额外问题想问一下Jingsong,就是在我当前这个需求场景下,能否给一些提示,关于如果merge以后,怎么准确又高效地更新结果的方差值呢?我目前想到的是在add里面可以增量更新方差值,同时用set记录每个出现的数值,然后在merge的时候,将set进行合并,重新计算一遍。保证结果的准确。(没有想到以增量更新的方式准确更新merge后的结果)。可以些idea吗

Thanks,
Zhefu

On Wed, May 6, 2020 at 22:00 Jingsong Li  wrote:

> Hi,
>
> 首先merge会不会导致结果不准确是完全看你的实现,这里你 的需求看起来并没有实现上的困难。
>
> merge这个接口就是为了优化性能才提出的,就像Benchao说的那样,如果你实现了merge,引擎会给做一些pane的优化。
>
> 当然前提是merge是有效率提升的,具体根据你的实现和测试来判断。
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 9:22 PM Benchao Li  wrote:
>
> > Hi,
> >
> > 如果你用的sliding window,应该是也需要实现merge的。因为sliding window本身做了一个pane的优化。
> > 会按照sliding size和window size取最大公约数作为pane,然后在trigger window计算的时候,把属于这个
> > window的pane都merge起来。
> >
> > Zhefu PENG  于2020年5月6日周三 下午9:05写道:
> >
> > > 非常感谢。那我是不是能理解为:我在这里用的是sliding time
> > > window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?
> > >
> > > On Wed, May 6, 2020 at 20:49 1193216154 <1193216...@qq.com> wrote:
> > >
> > > > 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
> > > >
> > > >
> > > >
> > > > ---原始邮件---
> > > > 发件人: "Zhefu PENG" > > > 发送时间: 2020年5月6日(周三) 晚上8:34
> > > > 收件人: "user-zh" > > > 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
> > > >
> > > >
> > > > Hi all,
> > > >
> > > >
> > > >
> > >
> >
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
> > > >
> > > >
> > > >
> > >
> >
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
> > > >
> > > >
> > >
> >
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
> > > >
> > > > 代码(简单伪代码)如下:
> > > > 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> > > > 第二项是计算的方差结果,输出的结果是
> > > > class MergeAggregateExample extends AggregateFunction[Double, (Set,
> > > > Double), Double] {
> > > >  override def createAccumulator(): (Set, Double) =
> {
> > > >  //Set用来对数据进行去重, 后一项是计算方差后的结果
> > > >  (Set(), 0.0)
> > > >  }
> > > >
> > > >  override def add(item: Double, accumulator: (Set,
> > > > Double)): (Set,
> > > > Double) = {
> > > >  //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> > > > 并且增量更新方差;如果没出现过,直接跳过
> > > >  new Tuple2(Set, Double)
> > > >  }
> > > >
> > > >
> > > >  override def getResult(accumulator: (Set,
> Double)):
> > > > Double = {
> > > >  // 返回计算的方差结果
> > > >  accumulator._2
> > > >  }
> > > >
> > > >  override def merge(a: (Set, Double), b: (Set,
> > > Double)):
> > > > (Set, Double) =
> > > > {
> > > >  // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
> > > >  }
> > > >
> > > >
> > > > 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
> > > >
> > > > Looking forward to your reply and help.
> > > >
> > > > Best,
> > > > Zhefu
> > >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
非常感谢。那我是不是能理解为:我在这里用的是sliding time
window,就不会用到merge功能,所以merge的功能不会影响到我结果的准确性?

On Wed, May 6, 2020 at 20:49 1193216154 <1193216...@qq.com> wrote:

> 官网有说,merge是在使用session窗口的时候用到,因为需要合并窗口
>
>
>
> ---原始邮件---
> 发件人: "Zhefu PENG" 发送时间: 2020年5月6日(周三) 晚上8:34
> 收件人: "user-zh" 主题: flink窗口函数AggregateFunction中,merge的作用和应用场景
>
>
> Hi all,
>
>
> 在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。
>
>
> 之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
>
> 但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。
>
> 代码(简单伪代码)如下:
> 简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
> 第二项是计算的方差结果,输出的结果是
> class MergeAggregateExample extends AggregateFunction[Double, (Set,
> Double), Double] {
>  override def createAccumulator(): (Set, Double) = {
>  //Set用来对数据进行去重, 后一项是计算方差后的结果
>  (Set(), 0.0)
>  }
>
>  override def add(item: Double, accumulator: (Set,
> Double)): (Set,
> Double) = {
>  //判断新来的数据是否之前出现过,如果之前没出现过: 更新set,
> 并且增量更新方差;如果没出现过,直接跳过
>  new Tuple2(Set, Double)
>  }
>
>
>  override def getResult(accumulator: (Set, Double)):
> Double = {
>  // 返回计算的方差结果
>  accumulator._2
>  }
>
>  override def merge(a: (Set, Double), b: (Set, Double)):
> (Set, Double) =
> {
>  // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
>  }
>
>
> 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。
>
> Looking forward to your reply and help.
>
> Best,
> Zhefu


flink窗口函数AggregateFunction中,merge的作用和应用场景

2020-05-06 Thread Zhefu PENG
Hi all,

在使用增量更新窗口函数AggregateFunction的时候,需要重新定义merge函数。在查阅了一些资料后,有些资料(博客)说,merge函数的作用是对于两个跨节点之间的ACC的合并;还有位朋友说是对于状态的恢复,一般不会用上;查了一下别的资料并没有相关的说明,在此想问并确定一下具体作用以及可能的使用场景。

之所以需要了解和确定该函数功能,是现在我有这样一个需求场景:我想流式的对于数据的中的某个字段值的进行方差计算并进行增量更新,这样可以不用储存原始数据,只存储中间结果,从而节省运行内存。
但是如果merge是对于跨节点之间的ACC进行合并,那会造成结果的不准确,比如:一个节点上有1,4,3(去重后)的数据和计算后的方差,另一个节点上有2,3,4(去重后)的数据和计算后的方差,这种方式能保证节点内的结果是准确的,但是合并后的结果就不够准确。

代码(简单伪代码)如下:
简单说明:为了简化,输入的数据是个Double, ACC的数据类型是个Tuple2, 第一项是数据的Set(保证去重),
第二项是计算的方差结果,输出的结果是
class MergeAggregateExample extends AggregateFunction[Double, (Set,
Double), Double] {
override def createAccumulator(): (Set, Double) = {
  //Set用来对数据进行去重, 后一项是计算方差后的结果
  (Set(), 0.0)
}

override def add(item: Double, accumulator: (Set, Double)): (Set,
Double) = {
  //判断新来的数据是否之前出现过,如果之前没出现过: 更新set, 并且增量更新方差;如果没出现过,直接跳过
  new Tuple2(Set, Double)
}


override def getResult(accumulator: (Set, Double)): Double = {
  // 返回计算的方差结果
  accumulator._2
}

override def merge(a: (Set, Double), b: (Set, Double)): (Set, Double) =
{
  // 未知,如果是直接做类似add内的增量更新操作,是否会导致结果的不准确?
  }


目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,最好能针对我提到的这种需求场景,非常感谢。

Looking forward to your reply and help.

Best,
Zhefu


Re: windows用户使用pyflink问题

2020-04-27 Thread Zhefu PENG
可以尝试在external lib把site-packages下的内容都添加进去,可以帮助提升开发效率。

On Tue, Apr 28, 2020 at 10:13 tao siyuan  wrote:

> 目前,pycharm不支持pyflink开发,请问在windows
> 系统下,还有其他有效工具或者方式能更加方便得支持pyflink的开发,debugging,源码track吗?
>


Re: 任务假死

2020-04-25 Thread Zhefu PENG
图好像挂了看不到。是不是和这两个场景描述比较相似

[1] http://apache-flink.147419.n8.nabble.com/flink-kafka-td2386.html
[2]  http://apache-flink.147419.n8.nabble.com/Flink-Kafka-td2390.html
On Sun, Apr 26, 2020 at 10:58 yanggang_it_job 
wrote:

> 1、Flink-UI截图
> 我任务的启动配置是 -p 200 -ys 5,也就是说会有40个TM运行,之前正常运行的时候确实是40个TM,而现在只有1个TM在运行;
> 同时通过观察数据的同步时间,任务已经在两天前停止写入数据了,但是查看任务的状态却是RUNNING;
> 我的理解是当tm申请失败,那么当前任务就会退出并把状态设置为FINISHED,但是真实情况却是上面描述那样。
> 请问为什么会出现这种情况呢?
>
> thanks
>
>
>
>
>
>


Re: Flink 运行一段时间后不再消费Kafka中数据

2020-04-09 Thread Zhefu PENG
我也是遇到了相同的这个问题!你那边只是不再读取kafka数据吗?其他算子还正常吗?

On Thu, Apr 9, 2020 at 18:57 季 鸿飞  wrote:

> 嗨,大家好
>
> 我遇到了一个很棘手的问题,我的程序运行了15天了,每天收同等数据量的数据,之前每天都很正常,可今天job不再消费Kafka里的数据了。
>
> 一切看起来都很正常,log日志中也没有报错,不知什么原因导致了这个问题。
>
>
>
>
>
> Hi,here
>
> I met a very difficult problem. My program has been running for 15 days,
> receiving data of the same amount every day. Before that, it was normal
> every day, but today, job no longer consumes data in Kafka.
>
> Everything seems to be normal. There is no error in the log log. I don't
> know what causes this problem.
>
>
>
>
>
> 发送自 Windows 10 版邮件 应用
>
>
>


flink读取kafka数据进程卡死

2020-04-09 Thread Zhefu PENG
Hi all,

最近在使用flink读取kafka读取数据的时候,遇到这么个情况:

flink任务会突然在某一个时间点卡死,进程不报错,也不会fail然后退出,只会一直卡在那里也不会kafka数据,查看jobmanager和taskmanager的日志都没有报错信息,只有启动任务时候的信息或者要我设置要print的一些信息。

我的代码逻辑大概是这样:从kafka读取数据,会经过cep进行匹配,然后选取匹配命中的数据,进行聚合操作,以及窗口函数的自定义操作。

配置大概为: 2G的JobManager,2*8G的TaskManager,并行度总共为4
Kafka的数据量大概:每秒5000左右。

主要是查了很多日志没有报错信息,本身job也不会fail,这个情况让我比较挠头。请问有朋友遇到过这种情况吗?或者希望能给到更多的建议。非常谢谢

Best,
Zhefu


[flink-1.10]有关使用cep功能时times的问题以及pyflink table api的聚合计算操作问题

2020-04-02 Thread Zhefu PENG
Hi all,

最近在做flink相关的使用和开发,现在遇到了两个问题:

1.当使用cep功能时, pattern本身的积累功能有一个Pattern.times()的接口,
当我在这个times中输入一个较大的数字(比如超过100,000及以上的数字)的时候,
启动相关的任务会非常耗时甚至耗时过久。比如我用100,000时, 启动时间一共花了15+min.

启动方式为:flink run thenameofjar.jar
不知道这是使用方式不对还是什么其他问题?

2. 当我们用pyflink的table api时, 会出现聚合计算的错误,代码和错误日志如下:

# 环境声明
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

schema声明
return Schema() \
.field('source', DataTypes.STRING()) \
.field('proctime',DataTypes.TIMESTAMP()).proctime()


# 窗口group计算
table3 = table1.window(Tumble.over("1.minutes").on("proctime").alias('w')) \
.group_by('w, source') \
.select("source")

error日志:

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/runpy.py", line 193, in
_run_module_as_main
"__main__", mod_spec)
  File "/usr/local/python3/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
  File "/flink_test/flink_udf_kafka.py", line 65, in 
.select("source")
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/table/table.py",
line 784, in select
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o196.select.
: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at