退订
退订
Reply:回复:flink如何在无数据流的情况下也能实现state定时输出的功能
定时器触发不需要有数据,可以设置一个八点的定时器定时触发,具体操作可以百度 -- 原始邮件 -- From: Hongyuan Ma http://apache-flink.147419.n8.nabble.com/
回复:flink如何在无数据流的情况下也能实现state定时输出的功能
我也想知道。◕‿◕。 onTimer定时器不是按系统时间来的 在2021年03月01日 15:16,dushang 写道: 我想在每天早上八点中定时将state输出到mysql中,利用timer的话是需要在八点附近有一条数据流过来并且判断一下时间才能输出,如何实现在没有数据流过来的时候仍然定时输出state?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink如何在无数据流的情况下也能实现state定时输出的功能
我想在每天早上八点中定时将state输出到mysql中,利用timer的话是需要在八点附近有一条数据流过来并且判断一下时间才能输出,如何实现在没有数据流过来的时候仍然定时输出state?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.10.2 Unknown operation 108
没有图,重新发下吧,用图床 > 2021年3月1日 14:46,xushanshan <1337220...@qq.com> 写道: > > flink 1.10.2 使用sql编写的任务,凌晨总是报如下所示的错误,但是任务没停数据正常,请问是什么原因? > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Flink-SQL-Connector扩展问题
在将旧版本升级至1.12版本中,需要支持proctime和eventime时发现 DefinedProctimeAttribute该方法已过期,但是查看官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E4%BD%BF%E7%94%A8-tablesource-%E5%AE%9A%E4%B9%89-1 实例仍然使用 DefinedProctimeAttribute该方法 且并没有说明替换方法? guaishushu1...@163.com
Re: flink 1.12.2-rc2 被挖矿
能再给一些细节吗?确认是Flink的问题导致的吗?怀疑的漏洞是哪个? 最近1.12.2 rc2正在release voting阶段,我们希望尽快确认是否存在安全漏洞并及时修复(如果有),谢谢。 Best Regards, Yu On Mon, 1 Mar 2021 at 13:26, macdoor wrote: > 我编译的flink 1.12.2-rc2 被挖矿,这个漏洞之前不是堵住了吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
flink 1.10.2 Unknown operation 108
flink 1.10.2 使用sql编写的任务,凌晨总是报如下所示的错误,但是任务没停数据正常,请问是什么原因? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错
这个其实原因是阿里云的LoadBalancer探活机制不停的给Flink的rest endpoint发送RST导致的 目前有一个ticket来跟进这个问题[1],但还没有修复 短时间内你可以通过log4j的配置将org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint 这个package的log level设置为WARN来暂时避免 [1]. https://issues.apache.org/jira/browse/FLINK-18129 Best, Yang 王 羽凡 于2021年3月1日周一 下午1:01写道: > 使用Flink1.12 Application Mode在阿里云托管Kubernetes > ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。 > 但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么? > > 启动命令: > ./bin/flink run-application \ > --target kubernetes-application \ > -Dkubernetes.cluster-id=demo \ > -Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \ > local:///opt/flink/usrlib/my-flink-job.jar > > 日志: > 2021-03-01 04:52:06,518 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Job 6eb4027586e7137b20ecc8c3ce624417 is submitted. > 2021-03-01 04:52:06,518 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor > [] - Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417. > 2021-03-01 04:52:08,303 INFO > org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered > 0 pods from previous attempts, current attempt id is 1. > 2021-03-01 04:52:08,303 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Recovered 0 workers from previous attempt. > 2021-03-01 04:52:08,306 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0 > was granted leadership with fencing token > 2021-03-01 04:52:08,310 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - > Starting the SlotManager. > 2021-03-01 04:52:08,596 WARN > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > Unhandled exception > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275] > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > ~[?:1.8.0_275] > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275] > at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275] > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > ~[?:1.8.0_275] > at > org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > [flink-dist_2.12-1.12.0.jar:1.12.0] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] > 2021-03-01 04:52:08,596 WARN > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - > Unhandled exception > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275] > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > ~[?:1.8.0_275] > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275] > at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275] > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > ~[?:1.8.0_275] > at > org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > at >
flink 1.12.2-rc2 被挖矿
我编译的flink 1.12.2-rc2 被挖矿,这个漏洞之前不是堵住了吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错
使用Flink1.12 Application Mode在阿里云托管Kubernetes ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。 但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么? 启动命令: ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=demo \ -Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \ local:///opt/flink/usrlib/my-flink-job.jar 日志: 2021-03-01 04:52:06,518 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 6eb4027586e7137b20ecc8c3ce624417 is submitted. 2021-03-01 04:52:06,518 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417. 2021-03-01 04:52:08,303 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2021-03-01 04:52:08,303 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2021-03-01 04:52:08,306 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0 was granted leadership with fencing token 2021-03-01 04:52:08,310 INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - Starting the SlotManager. 2021-03-01 04:52:08,596 WARN org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275] at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_275] at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275] at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275] at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_275] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.12-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] 2021-03-01 04:52:08,596 WARN org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275] at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_275] at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275] at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275] at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_275] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.12-1.12.0.jar:1.12.0] at
flink job 一直失败重试, 生产者id 没有被分配事务 id
hi, all flink 生产数据到 kafka 报错, 导致 job 一直重试 跟踪情况: 每个 job 启动后, 大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因 报错信息: org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The producer attempted to use a producer id which is not currently assigned to its transactional id. at org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) kafka 生产配置: // InstanceAlreadyExistsException prop.setProperty("client.id", "") // 修改生产者的事务超时属性transaction.timeout.ms prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") prop.setProperty("max_in_flight_requests_per_connection", "1") // 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG prop.setProperty("enable_idempotence_config", "true") // RETRIES_CONFIG prop.setProperty("retries_config", "5") val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String]( topic, new ResultStringKafkaSerializationSchema(topic), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ) flink 同样配置的 EXACTLY_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 谢谢大家!
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi, Xingbo 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~! 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi, Xingbo 非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法, 项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程, 也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。 在 2021-03-01 09:54:49,"Xingbo Huang" 写道: >Hi, > >差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 > >Best >Xingbo > >xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > >> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. >> sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID >> = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND >> '20170307'" >> # 获取Query结果 >> query_table = env.sql_query(sql) >> query_table.to_pandas() >> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? >> 由于python只是封装了一下flink的接口,所以会是GIL的影响么? >> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! >> >>
Pyflink????????HBase??
pyflinkhbase??
Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s
Hi, 差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。 Best Xingbo xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道: > 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2. > sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID > = source2.ID WHERE ID = '123456' AND DATE BETWEEN '20160701' AND > '20170307'" > # 获取Query结果 > query_table = env.sql_query(sql) > query_table.to_pandas() > 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢? > 由于python只是封装了一下flink的接口,所以会是GIL的影响么? > 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx ! > >