退订

2021-02-28 Thread china_tao
退订

Reply:回复:flink如何在无数据流的情况下也能实现state定时输出的功能

2021-02-28 Thread smq
定时器触发不需要有数据,可以设置一个八点的定时器定时触发,具体操作可以百度







-- 原始邮件 --
From: Hongyuan Ma http://apache-flink.147419.n8.nabble.com/

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Matthias Pohl
Hi Abhishek,
have you also tried to apply the instructions listed in [1]?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/advanced/logging.html#configuring-log4j1

On Mon, Mar 1, 2021 at 4:42 AM Abhishek Shukla 
wrote:

> Hi Matthias,
> Thanks for replying,
> I checked both of these pages,
> And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
> are there in property file,
>
> I am able to see the logs of pipeline once application in up, but the logs
> related to application failure or successful bean creation or logs at time
> of post construct are not getting printed out in file, which was happening
> in flink 1.9 with provided log4j-cli.properties file.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复:flink如何在无数据流的情况下也能实现state定时输出的功能

2021-02-28 Thread Hongyuan Ma
我也想知道。◕‿◕。 onTimer定时器不是按系统时间来的


在2021年03月01日 15:16,dushang 写道:
我想在每天早上八点中定时将state输出到mysql中,利用timer的话是需要在八点附近有一条数据流过来并且判断一下时间才能输出,如何实现在没有数据流过来的时候仍然定时输出state?谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink如何在无数据流的情况下也能实现state定时输出的功能

2021-02-28 Thread dushang
我想在每天早上八点中定时将state输出到mysql中,利用timer的话是需要在八点附近有一条数据流过来并且判断一下时间才能输出,如何实现在没有数据流过来的时候仍然定时输出state?谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.10.2 Unknown operation 108

2021-02-28 Thread liang zhao
没有图,重新发下吧,用图床

> 2021年3月1日 14:46,xushanshan <1337220...@qq.com> 写道:
> 
> flink 1.10.2 使用sql编写的任务,凌晨总是报如下所示的错误,但是任务没停数据正常,请问是什么原因?
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Flink-SQL-Connector扩展问题

2021-02-28 Thread guaishushu1...@163.com
在将旧版本升级至1.12版本中,需要支持proctime和eventime时发现 
DefinedProctimeAttribute该方法已过期,但是查看官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E4%BD%BF%E7%94%A8-tablesource-%E5%AE%9A%E4%B9%89-1
  实例仍然使用 DefinedProctimeAttribute该方法 且并没有说明替换方法?


guaishushu1...@163.com


Re: flink 1.12.2-rc2 被挖矿

2021-02-28 Thread Yu Li
能再给一些细节吗?确认是Flink的问题导致的吗?怀疑的漏洞是哪个?

最近1.12.2 rc2正在release voting阶段,我们希望尽快确认是否存在安全漏洞并及时修复(如果有),谢谢。

Best Regards,
Yu


On Mon, 1 Mar 2021 at 13:26, macdoor  wrote:

> 我编译的flink 1.12.2-rc2 被挖矿,这个漏洞之前不是堵住了吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink 1.10.2 Unknown operation 108

2021-02-28 Thread xushanshan
flink 1.10.2 使用sql编写的任务,凌晨总是报如下所示的错误,但是任务没停数据正常,请问是什么原因?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错

2021-02-28 Thread Yang Wang
这个其实原因是阿里云的LoadBalancer探活机制不停的给Flink的rest endpoint发送RST导致的
目前有一个ticket来跟进这个问题[1],但还没有修复

短时间内你可以通过log4j的配置将org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint
这个package的log level设置为WARN来暂时避免

[1]. https://issues.apache.org/jira/browse/FLINK-18129


Best,
Yang

王 羽凡  于2021年3月1日周一 下午1:01写道:

> 使用Flink1.12 Application Mode在阿里云托管Kubernetes
> ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。
> 但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么?
>
> 启动命令:
> ./bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=demo \
> -Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \
> local:///opt/flink/usrlib/my-flink-job.jar
>
> 日志:
> 2021-03-01 04:52:06,518 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 6eb4027586e7137b20ecc8c3ce624417 is submitted.
> 2021-03-01 04:52:06,518 INFO
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417.
> 2021-03-01 04:52:08,303 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered
> 0 pods from previous attempts, current attempt id is 1.
> 2021-03-01 04:52:08,303 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
> 2021-03-01 04:52:08,306 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0
> was granted leadership with fencing token 
> 2021-03-01 04:52:08,310 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] -
> Starting the SlotManager.
> 2021-03-01 04:52:08,596 WARN
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Unhandled exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> [flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> 2021-03-01 04:52:08,596 WARN
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
> Unhandled exception
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
> at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> ~[?:1.8.0_275]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> 

Re: Kubernetes HA - attempting to restore from wrong (non-existing) savepoint

2021-02-28 Thread Yang Wang
Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints
have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in
KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is
recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application
cluster entrypoint is not creating a new JobGraph from the specified
arguments.


Best,
Yang

Alexey Trenikhun  于2021年2月27日周六 上午1:48写道:

> Hello,
> We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is
> deployed as Job, single TM as StatefulSet). We taken savepoint with
> cancel=true. Now when we are trying to start job using --fromSavepoint *A*,
> where is *A* path we got from taking savepoint (ClusterEntrypoint reports
> *A* in log), but looks like Job for some reason ignores given *A* and
> actually trying to restore from some path *B* (CheckpointCoordinator logs
> *B* ):
>
> *{"ts":"2021-02-26T17:09:52.500Z","message":" Program
> Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-e8a201008f2c
> ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:52.502Z","message":"
>  
> ","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":2}
>  *
> *...*
>
> *{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='gsp--jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are
> already
> downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during
> restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.183Z","message":"Starting job
>  from savepoint
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-00-fbcd58f66685
> 
> (allowing non restored
> state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":2}
> {"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build
> 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id:
> 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":2}
> 

flink 1.12.2-rc2 被挖矿

2021-02-28 Thread macdoor
我编译的flink 1.12.2-rc2 被挖矿,这个漏洞之前不是堵住了吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 1.12 ApplicationMode运行在阿里云托管Kubernetes报错

2021-02-28 Thread 王 羽凡
使用Flink1.12 Application Mode在阿里云托管Kubernetes 
ACK启动发现一些报错,同样的报错在自建Kubernetes集群中未发现。
但是观察taskmanager容器有正常启动,后续任务也可正常执行,针对该报错需如何处理?是不兼容阿里云ACK集群么?

启动命令:
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=demo \
-Dkubernetes.container.image=xx.xx.xx/xx/xxx:2.0.12 \
local:///opt/flink/usrlib/my-flink-job.jar

日志:
2021-03-01 04:52:06,518 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Job 6eb4027586e7137b20ecc8c3ce624417 is submitted.
2021-03-01 04:52:06,518 INFO  
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - 
Submitting Job with JobId=6eb4027586e7137b20ecc8c3ce624417.
2021-03-01 04:52:08,303 INFO  
org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 
pods from previous attempts, current attempt id is 1.
2021-03-01 04:52:08,303 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
Recovered 0 workers from previous attempt.
2021-03-01 04:52:08,306 INFO  
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
ResourceManager akka.tcp://flink@demo.default:6123/user/rpc/resourcemanager_0 
was granted leadership with fencing token 
2021-03-01 04:52:08,310 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Starting the SlotManager.
2021-03-01 04:52:08,596 WARN  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-03-01 04:52:08,596 WARN  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Unhandled 
exception
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_275]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_275]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_275]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_275]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_275]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
 [flink-dist_2.12-1.12.0.jar:1.12.0]
at 

Re: apache-flink +spring boot - logs related to spring boot application start up not printed in file in flink 1.12

2021-02-28 Thread Abhishek Shukla
Hi Matthias,
Thanks for replying,
I checked both of these pages,
And I downloaded the zip of flink 1.12.1 so the changes related to log4j2
are there in property file,

I am able to see the logs of pipeline once application in up, but the logs
related to application failure or successful bean creation or logs at time
of post construct are not getting printed out in file, which was happening
in flink 1.9 with provided log4j-cli.properties file.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


flink job 一直失败重试, 生产者id 没有被分配事务 id

2021-02-28 Thread Lynn Chen
hi,  all 


flink 生产数据到 kafka 报错,  导致 job  一直重试


跟踪情况:  每个 job 启动后,  大约正常跑 20 天左右就开始出现这个问题了, 导致任务一直重试, 一直未找到问题的真实原因




报错信息:


org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)




kafka 生产配置: 


// InstanceAlreadyExistsException
prop.setProperty("client.id", "")
// 修改生产者的事务超时属性transaction.timeout.ms
prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
prop.setProperty("max_in_flight_requests_per_connection", "1")
// 幂等性 Producer ENABLE_IDEMPOTENCE_CONFIG
prop.setProperty("enable_idempotence_config", "true")
// RETRIES_CONFIG
prop.setProperty("retries_config", "5")

val kafkaSink: FlinkKafkaProducer[String] = new FlinkKafkaProducer[String](
  topic,
new ResultStringKafkaSerializationSchema(topic),
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)


flink 同样配置的 EXACTLY_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)


谢谢大家!











Re: java Flink local test failure (Could not create actor system)

2021-02-28 Thread Smile
Hi Vijay,

Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12.
Flink has APIs, libraries, and runtime modules written in Scala. Users of
the Scala API and libraries may have to match the Scala version of Flink
with the Scala version of their projects (because Scala is not strictly
backward compatible). See [1] for more information.

If using maven, artifactId of Flink components usually end with scala
version, such as flink-streaming-java_2.11 means it was built against Scala
2.11.

[1].
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#scala-versions

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Union fields with time attributes have different types

2021-02-28 Thread Jark Wu
Hi Sebastián,

`endts` in your case is a time attribute which is slightly different than a
regular TIMESTAMP type.
You can manually `cast(endts as timestamp(3)` to make this query work
which removes the time attribute meta.

SELECT `evt`, `value`, `startts`, cast(endts as timestamp(3)
FROM aggs_1m


Best,
Jark

On Mon, 22 Feb 2021 at 05:01, Sebastián Magrí  wrote:

> I'm using a query like this
>
> WITH aggs_1m AS (
>   SELECT
> `evt`,
> `startts`
> `endts`,
> SUM(`value`) AS `value`
>   FROM aggregates_per_minute
> ), aggs_3m AS (
>   SELECT
> `evt`,
> TUMBLE_START(`endts`, INTERVAL '3' MINUTE) AS `startts`,
> TUMBLE_END(`endts`, INTERVAL '3' MINUTE) AS `endts`,
> SUM(`c`) AS `value`
>   FROM aggregates_per_minute
>   GROUP BY t, TUMBLE(`endts`, INTERVAL '3' MINUTE)
> )
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_1m
> UNION
> SELECT `evt`, `value`, `startts`, `endts`
> FROM aggs_3m
>
> But it's throwing this exception
>
> org.apache.flink.table.api.ValidationException: Union fields with time
> attributes have different types.
>
> Doesn't TUMBLE_START(somets, ...) return a TIMESTAMP of the same type?
>
> --
> Sebastián Ramírez Magrí
>


Re: Best way to handle BIGING to TIMESTAMP conversions

2021-02-28 Thread Jark Wu
Hi Sebastián,

You can use `TO_TIMESTAMP(FROM_UNIXTIME(e))` to get a timestamp value.
The BIGINT should be in seconds.  Please note to declare the computed column
 in DDL schema and declare a watermark strategy on this computed field to
make
 the field to be a rowtime attribute. Because streaming over window
requires to
 order by a time attribute.

Best,
Jark

On Sun, 21 Feb 2021 at 07:32, Sebastián Magrí  wrote:

> I have a table with two BIGINT fields for start and end of an event as
> UNIX time in milliseconds. I want to be able to have a resulting column
> with the delta in milliseconds and group by that difference. Also, I want
> to be able to have aggregations with window functions based upon the `end`
> field.
>
> The table definition looks like this:
> |CREATE TABLE sessions (
> |  `ats`   STRING,
> |  `e` BIGINT,
> |  `s` BIGINT,
> |  `proc_time` AS PROCTIME(),
> |  PRIMARY KEY (`ats`, `s`, `e`) NOT ENFORCED
> |)
>
> Then I have a few views like this:
>
> CREATE VIEW second_sessions AS
>   SELECT * FROM sessions
>   WHERE `e` - `s` = 1000
>
> And some windows using these views like this:
>
>   WINDOW w3m AS (
> PARTITION BY `t`
> ORDER BY `proc_time`
> RANGE BETWEEN INTERVAL '3' MINUTE PRECEDING AND CURRENT ROW
>   )
>
> I'd like to use the `e` field for windowing instead of `proc_time`. But I
> keep running into errors with the `TO_TIMESTAMP(BIGINT)` function now
> missing or with unsupported timestamp arithmetics.
>
> What is the best practice for a case such as this?
>
> Best Regards,
> --
> Sebastián Ramírez Magrí
>


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。

所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!







在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
所以,想请问一下,对于刚说的那种业务情况,有合适的执行建议么?保证效率的情况下,利用pandas or numpy中的矩阵计算,非常感谢~!

















在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


Re:Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread xiaoyue
Hi, Xingbo
非常感谢您的回复,转成pandas是想利用pandas中的矩阵计算方法,
项目需求,利用flink计算并将结果直接反馈给前端,所以应该是有source,无sink的过程,
也尝试过定义pandas类型的udaf计算,将聚合结果返回,执行时间并未有太大变化。



















在 2021-03-01 09:54:49,"Xingbo Huang"  写道:
>Hi,
>
>差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。
>
>Best
>Xingbo
>
>xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:
>
>> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
>> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
>> '20170307'"
>> # 获取Query结果
>> query_table = env.sql_query(sql)
>> query_table.to_pandas()
>> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
>> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
>> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>>
>>


Pyflink????????HBase??

2021-02-28 Thread ??????
pyflinkhbase??

Re: flink1.12 执行sql_query(),同样的数据源表,pyflink执行时间9min,java执行3s

2021-02-28 Thread Xingbo Huang
Hi,

差别在于你用了to_pandas(),这个性能慢(这个需要把数据都collect回来到客户端,然后构造一个python的DataFrame,所以慢)。to_pandas一般都是拿来调试用的,很方便,但是性能不行,如果你对性能有要求,你换个sink就行了。

Best
Xingbo

xiaoyue <18242988...@163.com> 于2021年2月26日周五 下午12:38写道:

> 不知道大家有没有遇到这个问题,流环境中链接Mysql数据库,利用DDL定义两个数据源表 source1, source2.
>  sql = "SELECT ID, NAME, IP, PHONE FROM source1 JOIN source2 ON source1.ID
> = source2.ID WHERE ID = '123456'  AND DATE BETWEEN '20160701' AND
> '20170307'"
> # 获取Query结果
> query_table = env.sql_query(sql)
> query_table.to_pandas()
> 相同的处理过程,python和java的处理速度差很多,请问什么原因导致的呢?
> 由于python只是封装了一下flink的接口,所以会是GIL的影响么?
> 蹲一个大佬的解答?也欢迎遇到同样问题的小伙伴讨论,thx !
>
>


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
It looks like I was finally able to get the expected labeling behavior that
I was looking for by simply storing a reference to the underlying
MetricGroup and then keeping track of any new metrics that I needed to
dynamically create and use downstream:

class MagicMetricRegistry(private val metricGroup: MetricGroup):
Serializable {
// Reference for all of the registered metrics
private val registeredMetrics: HashMap = hashMapOf()

// Increments a given metric by key
fun inc(metric: String, tenant: String, source: String, amount: Long =
1) {
// Store a key
val key = "$metric-$tenant-$source"
if (!registeredMetrics.containsKey(key)){
registeredMetrics[key] = metricGroup
.addGroup("tenant", tenant)
.addGroup("source", source)
.counter(metric)
}

// Update the metric by a given amount
registeredMetrics[key]!!.inc(amount)
}
}

And then simply within the open function call in my KeyedProcessFunction, I
stored a reference to it and registered any new, in this case tenant/source
combinations, as they came in:

class MagicWindowFunction: KeyedProcessFunction<...>() {
@Transient private lateinit var metrics: MagicMetricRegistry

override fun open(parameters: Configuration) {
metrics = MagicMetricRegistry(runtimeContext.metricGroup)
}

override fun processElement(...) {
// Omitted for brevity

metrics.inc("logs_seen", "my-tenant", "my-source")
}

// Omitted for brevity
}

This appears to be working as expected as far as I can tell at this point.
I can see all of the expected labels appearing within Prometheus and
further downstream in Grafana!

Thanks again,

Rion

On Sun, Feb 28, 2021 at 8:15 AM Rion Williams  wrote:

> Thanks Dylan,
>
> Totally understandable. I already have the appropriate exporters /
> monitors in place for scraping metrics from Flink, including custom ones,
> into Prometheus. The labeling challenge is really the big one as while I
> see lots of labels for the metrics being exported (e.g. job id, worker,
> etc.) I didn’t see a mechanism to inject my own into those coming from
> Flink.
>
> Additionally, in my specific use case I’m dealing with a multi-tenant
> pipeline (I.e. reading messages from a single multi-tenant Kafka topic),
> which is where the labeling comes in. I’d love to be able to have a counter
> (among other types of metrics) with their appropriate labels for each
> tenant.
>
> I suppose I could implement a custom counter or series of counters (one
> for each tenant) that would each be responsible for keeping track of their
> own respective tenant values. In my case I’m dealing with a
> KeyedProcessFunction, so I only have access to the key (tenant) within the
> processElement function as opposed to when the function is initially
> opened, where I understand you would typically register a metric.
>
> Sorry for the somewhat convoluted response, I’m still getting accustomed
> to some of the Flink APIs, specifically around metrics.
>
> Thanks,
>
> Rion
>
> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com> wrote:
>
> 
> Hi Rion,
>
> Regarding the question about adding Prometheus labels out of the box. This
> is common ask of all exporters, but Prometheus philosophy sees this as an
> "anti-pattern" as the metrics source can often be ambivalent about context.
> See [0] for example of such a discussion.
>
> Instead, we can establish context during service discovery. If, for
> example, we run clusters for tenants on Kubernetes, then within the
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add
> the Kubernetes labels from the pods, such as "tenant-id: foo" and
> "environment: staging" to each incoming metric it processes.
>
> This isn't limited to Kubernetes; each of the service discovery configs
> designed to accomodate translating metadata from context into metric labels.
>
> If this doesn't work for you, then consider encoding tenant identifier
> into job names, and extract this identifier in a metric_relabel_config [2]
>
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
>
>
> --
> *From:* Rion Williams 
> *Sent:* Sunday, February 28, 2021 12:46 AM
> *To:* Prasanna kumar 
> *Cc:* user 
> *Subject:* Re: Using Prometheus Client Metrics in Flink
>
> Hi Prassana,
>
> Thanks for that. It’s what I was doing previously as a workaround however
> I was just curious if there was any Flink-specific functionality to handle
> this prior to Prometheus.
>
> Additionally from the docs on metrics [0], it seems that there’s a pattern
> in place to use supported third-party metrics such as those from
> CodeHale/DropWizard via a 

Standard method to generate watermark forBoundedOutOfOrderness

2021-02-28 Thread Maminspapin
Hello, everyone.

I'm learning Flink but still not sure if I realise the topic of watermark
mechanism.

That is a simple common example of pipeline with event-time mode:


 

I want to use strategy
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSecond(6))* in my
code. Does it mean that this strategy is working by formula: 

*max(event_time) - 6 sec. = new watermark?*

So there are next steps:
0. At first we have w(0) and no input events
1. Get 4.  4 is new max. -> 4-6 < 0 -> still w(0)
2. Get 2.  2 < 4 -> 4 is max -> still w(0)
3. Get 11.11 > 4 -> 11 is new max -> 11-6 = 5 -> new watermark w(5)
4. Get 7.  11 is max. -> still w(5)
5. Get 9.  11 is max. -> still w(5)
6. Get 15  15 > 11 -> 15 is new max -> 15-6 = 9 -> new watermark w(9)

Am I right?

Thanks,
Yuri L.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Issues running multiple Jobs using the same JAR

2021-02-28 Thread Kezhu Wang
Hi Morgan,

You could check FLINK-11654, from its description, I think it is the
problem you encountered.

> We run multiple jobs on a cluster which write a lot to the same Kafka
topic from identically named sinks. When EXACTLY_ONCE semantic is enabled
for the KafkaProducers we run into a lot of ProducerFencedExceptions and
all jobs go into a restart cycle.

FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys (
morgan.geldenh...@tu-berlin.de) wrote:

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same JAR
in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with
errors.
at org.apache.kafka.clients.producer.internals.TransactionManager
.failIfNotReadyForSend(TransactionManager.java:356)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer
.java:926)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer
.java:865)
at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:915)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:187)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:395)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:609)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: org.apache.flink.streaming.connectors.kafka.
FlinkKafkaException: Failed to send data to Kafka: Producer attempted an
operation with an old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.checkErroneous(FlinkKafkaProducer.java:1392)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.close(FlinkKafkaProducer.java:965)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
... 3 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pending record count
must be zero at this point: 1
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
at org.apache.flink.streaming.runtime.tasks.StreamTask

Issues running multiple Jobs using the same JAR

2021-02-28 Thread Morgan Geldenhuys

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).


When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an old 
epoch. Eitherthere is a newer producer with the same transactionalId, 
orthe producer's transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)

... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a newer 
producer with the same transactionalId, orthe producer's transaction has 
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
at 

Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

You could also try `cancel —withSavepoint [savepointDir]` even it is in
deprecation. Comparing to take-savepoints and then cancel approach, there
will be no checkpoints in between. This may be important if there are two
phase commit operators in your job.


Best,
Kezhu Wang


On February 28, 2021 at 20:50:29, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking
two separate actions when stopping job: take-savepoints, then cancel.
--
*From:* Kezhu Wang 
*Sent:* Sunday, February 28, 2021 12:31 AM
*To:* user@flink.apache.org ; Meissner, Dylan <
dylan.t.meiss...@nordstrom.com>
*Subject:* Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Thanks Dylan,

Totally understandable. I already have the appropriate exporters / monitors in 
place for scraping metrics from Flink, including custom ones, into Prometheus. 
The labeling challenge is really the big one as while I see lots of labels for 
the metrics being exported (e.g. job id, worker, etc.) I didn’t see a mechanism 
to inject my own into those coming from Flink.

Additionally, in my specific use case I’m dealing with a multi-tenant pipeline 
(I.e. reading messages from a single multi-tenant Kafka topic), which is where 
the labeling comes in. I’d love to be able to have a counter (among other types 
of metrics) with their appropriate labels for each tenant.

I suppose I could implement a custom counter or series of counters (one for 
each tenant) that would each be responsible for keeping track of their own 
respective tenant values. In my case I’m dealing with a KeyedProcessFunction, 
so I only have access to the key (tenant) within the processElement function as 
opposed to when the function is initially opened, where I understand you would 
typically register a metric.

Sorry for the somewhat convoluted response, I’m still getting accustomed to 
some of the Flink APIs, specifically around metrics.

Thanks,

Rion

> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan  
> wrote:
> 
> 
> Hi Rion,
> 
> Regarding the question about adding Prometheus labels out of the box. This is 
> common ask of all exporters, but Prometheus philosophy sees this as an 
> "anti-pattern" as the metrics source can often be ambivalent about context. 
> See [0] for example of such a discussion.
> 
> Instead, we can establish context during service discovery. If, for example, 
> we run clusters for tenants on Kubernetes, then within the 
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add 
> the Kubernetes labels from the pods, such as "tenant-id: foo" and 
> "environment: staging" to each incoming metric it processes.
> 
> This isn't limited to Kubernetes; each of the service discovery configs 
> designed to accomodate translating metadata from context into metric labels.
> 
> If this doesn't work for you, then consider encoding tenant identifier into 
> job names, and extract this identifier in a metric_relabel_config [2]
> 
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
> 
> 
> From: Rion Williams 
> Sent: Sunday, February 28, 2021 12:46 AM
> To: Prasanna kumar 
> Cc: user 
> Subject: Re: Using Prometheus Client Metrics in Flink
>  
> Hi Prassana,
> 
> Thanks for that. It’s what I was doing previously as a workaround however I 
> was just curious if there was any Flink-specific functionality to handle this 
> prior to Prometheus.
> 
> Additionally from the docs on metrics [0], it seems that there’s a pattern in 
> place to use supported third-party metrics such as those from 
> CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see 
> a similarly named package for Prometheus which may be what I’m looking for as 
> it’s similarly named (flink-metrics-prometheus), so I may give that a try.
> 
> Thanks,
> 
> Rion
> 
> [0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
> 
>>> On Feb 28, 2021, at 12:20 AM, Prasanna kumar 
>>>  wrote:
>>> 
>> 
>> Rion,
>> 
>> Regarding the second question , you can aggregate by using sum function  
>> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
>> metric counter.
>> 
>> Prasanna.
>> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely sure how it would/could mesh with Flink. 
>> Does anyone have experience in working with these or know if they are 
>> supported?
>> 
>> Secondly, when using the Flink metrics, I noticed I was receiving a separate 
>> metric for each task that was being spun up. Is there an “easy button” to 
>> handle aggregating these to ensure that a single metric (e.g. 
>> total_messages) reflects 

Re: Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Thanks a lot Kezhu, this fits the bill perfectly.

Thanks,
Padarn



On Sun, Feb 28, 2021 at 7:00 PM Kezhu Wang  wrote:

> Hi Padarn,
>
> There is a configuration option “pipeline.max-parallelism”.
>
> It is not a cluster wide configuration but client/job/pipeline side
> configuration which means you should bring this configuration
> from flink conf file to pipeline generation stage.
>
>
> If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I
> found this in `flinkcluster_submit_job_script.go`) to submit job to
> cluster. This command already cover the bridge work, so I think it should
> just work in your case.
>
>
> pipeline-max-parallelism:
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism
>
>
> Best,
> Kezhu Wang
>
> On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:
>
> Hi all,
>
> Sorry for the basic question, but is it possible to set max
> parallelism using the flink conf file, rather than explicitly in code:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism
>
> Need this for a PR I am working on for the flink operator:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425
>
>


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Meissner, Dylan
Hi Rion,

Regarding the question about adding Prometheus labels out of the box. This is 
common ask of all exporters, but Prometheus philosophy sees this as an 
"anti-pattern" as the metrics source can often be ambivalent about context. See 
[0] for example of such a discussion.

Instead, we can establish context during service discovery. If, for example, we 
run clusters for tenants on Kubernetes, then within the kubernetes_sd_config 
[1] labelling rules we can instruct Prometheus to add the Kubernetes labels 
from the pods, such as "tenant-id: foo" and "environment: staging" to each 
incoming metric it processes.

This isn't limited to Kubernetes; each of the service discovery configs 
designed to accomodate translating metadata from context into metric labels.

If this doesn't work for you, then consider encoding tenant identifier into job 
names, and extract this identifier in a metric_relabel_config [2]

[0]: https://github.com/prometheus/node_exporter/issues/319
[1]: 
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
[2]: 
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs



From: Rion Williams 
Sent: Sunday, February 28, 2021 12:46 AM
To: Prasanna kumar 
Cc: user 
Subject: Re: Using Prometheus Client Metrics in Flink

Hi Prassana,

Thanks for that. It’s what I was doing previously as a workaround however I was 
just curious if there was any Flink-specific functionality to handle this prior 
to Prometheus.

Additionally from the docs on metrics [0], it seems that there’s a pattern in 
place to use supported third-party metrics such as those from 
CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a 
similarly named package for Prometheus which may be what I’m looking for as 
it’s similarly named (flink-metrics-prometheus), so I may give that a try.

Thanks,

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

On Feb 28, 2021, at 12:20 AM, Prasanna kumar  
wrote:


Rion,

Regarding the second question , you can aggregate by using sum function  
sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the metric 
counter.

Prasanna.

On Sat, Feb 27, 2021 at 9:01 PM Rion Williams 
mailto:rionmons...@gmail.com>> wrote:
Hi folks,

I’ve just recently started working with Flink and I was in the process of 
adding some metrics through my existing pipeline with the hopes of building 
some Grafana dashboards with them to help with observability.

Initially I looked at the built-in Flink metrics that were available, but I 
didn’t see an easy mechanism for setting/using labels with them. Essentially, I 
have two properties for my messages coming through the pipeline that I’d like 
to be able to keep track of (tenant/source) across several metrics (e.g. 
total_messages with tenant / source labels, etc.). I didn’t see an easy way to 
adjust this out of the box, or wasn’t aware of a good pattern for handling 
these.

I had previously used the Prometheus Client metrics [0] to accomplish this in 
the past but I wasn’t entirely sure how it would/could mesh with Flink. Does 
anyone have experience in working with these or know if they are supported?

Secondly, when using the Flink metrics, I noticed I was receiving a separate 
metric for each task that was being spun up. Is there an “easy button” to 
handle aggregating these to ensure that a single metric (e.g. total_messages) 
reflects the total processed across all of the tasks instead of each individual 
one?

Any recommendations / resources / advice would be greatly appreciated!

Thanks,

Rion

[0] : https://prometheus.io/docs/instrumenting/clientlibs/


Flink Metrics

2021-02-28 Thread Prasanna kumar
Hi flinksters,

Scenario: We have cdc messages from our rdbms(various tables) flowing to
Kafka.  Our flink job reads the CDC messages and creates events based on
certain rules.

I am using Prometheus  and grafana.

Following are there metrics that i need to calculate

A) Number of CDC messages wrt to each table.
B) Number of events created wrt to each event type.
C) Average/P99/P95 Latency (event created ts - ccd operation ts)

For A and B, I created counters and able to see the metrices flowing into
Prometheus . Few questions I have here.

1) How to create labels for counters in flink ? I did not find any easier
method to do it . Right now I see that I need to create counters for each
type of table and events . I referred to one of the community discussions.
[1] . Is there any way apart from this ?

2) When the job gets restarted , the counters get back to 0 . How to
prevent that and to get continuity.

For C , I calculated latency in code for each event and assigned  it to
histogram.  Few questions I have here.

3) I read in a few blogs [2] that histogram is the best way to get
latencies. Is there any better idea?

4) How to create buckets for various ranges? I also read in a community
email that flink implements  histogram as summaries.  I also should be able
to see the latencies across timelines .

[1]
https://stackoverflow.com/questions/58456830/how-to-use-multiple-counters-in-flink
[2] https://povilasv.me/prometheus-tracking-request-duration/

Thanks,
Prasanna.


Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Meissner, Dylan
Thank you for opening the bug and including the extra context.

I'll track the progress and, in the meantime, I will work around by taking two 
separate actions when stopping job: take-savepoints, then cancel.

From: Kezhu Wang 
Sent: Sunday, February 28, 2021 12:31 AM
To: user@flink.apache.org ; Meissner, Dylan 

Subject: Re: Stateful functions 2.2 and stop with savepoint

Hi,

Thanks for reporting. I think it is a Flink bug and have created FLINK-21522 
for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang


On February 28, 2021 at 00:59:04, Meissner, Dylan 
(dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain 
", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Independence of task parallelism

2021-02-28 Thread Jan Nitschke
Hello, 

We are working on a project where we want to gather information about the job 
performance across different task level parallelism settings.
Essentially, we want to see how the throughput of a single task varies across 
different parallelism settings, e.g. for a job of 5 tasks: 1-1-1-1-1 vs. 
1-2-1-1-1 vs. 2-2-2-2-2. 

We are running flink on Kubernetes, a job with 5 tasks, slot sharing is 
enabled, operator chasing is disabled and each task manager has one slot.

So, the number of task managers is always the number of the highest parallelism 
and wen can fit the entire job into one task manager slot. 

We are then running the job against multiple parallelism configs (such as those 
above), collect the relevant metrics and try to get some useful information out 
of them. 

We are now wondering how independent our results are from one another. More 
specifically, if we now look at the parallelism of the second task, is its 
performance independent of the parallelism of the other tasks? So, will a the 
second task perform the same in (1-2-1-1-1) as in (2-2-2-2-2)? 

Our take on it is the following: With our setup, (1-2-1-1-1) should result in 
one task manager holding the entire job and a second task manager that only 
runs the second task. (2-2-2-2-2) will run two task managers with the entire 
job. So, theoretically, the second task should have much more resources 
available in the first setup as it has the entire resources of that task 
manager to its disposal. Does that assumption hold or will flink assign a 
certain amount of resources to a task in a task manager no matter how many 
other tasks are running on that same task manager slot? 

We would highly appreciate any help. 

Best, 
Jan

Re: Setting max parallelism via properties

2021-02-28 Thread Kezhu Wang
Hi Padarn,

There is a configuration option “pipeline.max-parallelism”.

It is not a cluster wide configuration but client/job/pipeline side
configuration which means you should bring this configuration
from flink conf file to pipeline generation stage.


If I understand correctly, `flink-on-k8s-operator` uses `flink run`(I found
this in `flinkcluster_submit_job_script.go`) to submit job to cluster. This
command already cover the bridge work, so I think it should just work in
your case.


pipeline-max-parallelism:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#pipeline-max-parallelism


Best,
Kezhu Wang

On February 28, 2021 at 16:45:03, Padarn Wilson (pad...@gmail.com) wrote:

Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Re: Window Process function is not getting trigger

2021-02-28 Thread Kezhu Wang
Hi,

Glad to hear.

Normally, you would not encounter this if there are massive data.
`WatermarkStrategy.withIdleness` could be more appropriate in production.


Best,
Kezhu Wang


On February 24, 2021 at 22:35:11, sagar (sagarban...@gmail.com) wrote:

Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);
>>> }
>>> });
>>>
>>>
>>> priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant 

How to emit after a merge?

2021-02-28 Thread Yik San Chan
I define a `Transaction` class:

```scala
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
```

The `TransactionSource` simply emits `Transaction` with some time interval.
Now I want to compute the last 2 transaction timestamp of each account id,
see code below:

```scala
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource

object LastNJob {

  final val QUERY =
"""
  |WITH last_n AS (
  |SELECT accountId, `timestamp`
  |FROM (
  |SELECT *,
  |ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY
`timestamp` DESC) AS row_num
  |FROM transactions
  |)
  |WHERE row_num <= 2
  |)
  |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING))
last2_timestamp
  |FROM last_n
  |GROUP BY accountId
  |""".stripMargin

  def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment =
StreamTableEnvironment.create(streamEnv, settings)

val txnStream: DataStream[Transaction] = streamEnv
  .addSource(new TransactionSource)
  .name("transactions")

tableEnv.createTemporaryView("transactions", txnStream)

tableEnv.executeSql(QUERY).print()
  }
}
```

When I run the program, I get:

```
++--++
| op |accountId |last2_timestamp |
++--++
| +I |1 |  154627200 |
| +I |2 |  154627236 |
| +I |3 |  154627272 |
| +I |4 |  154627308 |
| +I |5 |  154627344 |
| -U |1 |  154627200 |
| +U |1 |154627200,154627380 |
| -U |2 |  154627236 |
| +U |2 |154627236,154627416 |
| -U |3 |  154627272 |
| +U |3 |154627272,154627452 |
| -U |4 |  154627308 |
| +U |4 |154627308,154627488 |
| -U |5 |  154627344 |
| +U |5 |154627344,154627524 |
| -U |1 |154627200,154627380 |
| +U |1 |  154627380 |
| -U |1 |  154627380 |
| +U |1 |154627380,154627560 |
(to continue)
```

Let's focus on the last transaction (from above) of accountId=1. When there
is a new transaction from account 1 that happens at
timestamp=154627560, there are 4 operations in total.

```
++--++
| op |accountId |last2_timestamp |
++--++
| -U |1 |154627200,154627380 |
| +U |1 |  154627380 |
| -U |1 |  154627380 |
| +U |1 |154627380,154627560 |
```

While I only want to emit the below "new status" to my downstream (let's
say another Kafka topic) via some sort of merging:

```
+--++
|accountId |last2_timestamp |
+--++
|1 |154627380,154627560 |
```

So that my downstream is able to consume literally "the last 2 transaction
timestamps of each account":
```
+--++
|accountId |last2_timestamp |
+--++
|1 |  154627200 |
|1 |154627200,154627380 |
|1 |154627380,154627560 |
(to continue)
```

What is the right way to do this?


Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Hi Prassana,

Thanks for that. It’s what I was doing previously as a workaround however I was 
just curious if there was any Flink-specific functionality to handle this prior 
to Prometheus.

Additionally from the docs on metrics [0], it seems that there’s a pattern in 
place to use supported third-party metrics such as those from 
CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a 
similarly named package for Prometheus which may be what I’m looking for as 
it’s similarly named (flink-metrics-prometheus), so I may give that a try.

Thanks,

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

> On Feb 28, 2021, at 12:20 AM, Prasanna kumar  
> wrote:
> 
> 
> Rion,
> 
> Regarding the second question , you can aggregate by using sum function  
> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
> metric counter.
> 
> Prasanna.
> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely sure how it would/could mesh with Flink. 
>> Does anyone have experience in working with these or know if they are 
>> supported?
>> 
>> Secondly, when using the Flink metrics, I noticed I was receiving a separate 
>> metric for each task that was being spun up. Is there an “easy button” to 
>> handle aggregating these to ensure that a single metric (e.g. 
>> total_messages) reflects the total processed across all of the tasks instead 
>> of each individual one?
>> 
>> Any recommendations / resources / advice would be greatly appreciated!
>> 
>> Thanks,
>> 
>> Rion
>> 
>> [0] : https://prometheus.io/docs/instrumenting/clientlibs/


Setting max parallelism via properties

2021-02-28 Thread Padarn Wilson
Hi all,

Sorry for the basic question, but is it possible to set max
parallelism using the flink conf file, rather than explicitly in code:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Need this for a PR I am working on for the flink operator:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/pull/425


Re: Stateful functions 2.2 and stop with savepoint

2021-02-28 Thread Kezhu Wang
Hi,

Thanks for reporting. I think it is a Flink bug and have created
FLINK-21522 for it. You could track progress there.


FLINK-21522: https://issues.apache.org/jira/browse/FLINK-21522


Best,
Kezhu Wang

On February 28, 2021 at 00:59:04, Meissner, Dylan (
dylan.t.meiss...@nordstrom.com) wrote:

I have an embedded function with a SinkFunction as an egress, implemented
as this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required
to use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema,
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain
", the operation never completes, reporting IN_PROGRESS until I hit
the "failure-cause:
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired
before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed
their work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%)
| end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0%
| end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms |
data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?