flink 提交job后 task 一直是 schedule 状态

2021-06-17 Thread Lei Wang
flink-1.11.2
./bin/start-cluster.sh 启动然后
./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
localhost --port 

但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误

2021-06-18 13:34:26,683 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
28577f2) switched from SCHEDULED to FAILED on not deployed.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure tha
t the cluster has enough resources.
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
~[flink-dist_2.11-1.11.2.jar:1
.11.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
~[flink-dist_2.11-1.11.2.jar:1.11.2

但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。

有大神帮助解释一下吗?

谢谢,
王磊


flink写es和hbase反压

2021-06-17 Thread 田磊
现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。


totorobabyfans
邮箱:totorobabyf...@163.com
签名由 网易邮箱大师 定制


flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭

2021-06-17 Thread raofang
hi,请教大家一个问题:
flink1.12.2 sql BlinkPlanner
使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
 谢谢~



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Task is always created state after submit a example job

2021-06-17 Thread Lei Wang
flink 1.11.2 on a single host.

./bin/start-cluster.sh and then

./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
localhost --port 

But on the jobmanager UI, the task is always in created state.  There's
available  slots.

Any insights on this?

Thanks,
Lei


Re: hbase async lookup能否保证输出结果有序?

2021-06-17 Thread zilong xiao
好的,感谢Jark~

Jark Wu  于2021年6月18日周五 上午10:59写道:

> 可以看下 AsyncWaitOperator 的源码实现。
>
> Best,
> Jark
>
> On Tue, 15 Jun 2021 at 18:53, zilong xiao  wrote:
>
> > 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
> >
> > Jingsong Li  于2021年6月15日周二 下午5:07写道:
> >
> > > 是有序的。
> > >
> > > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Jun 15, 2021 at 3:42 PM zilong xiao 
> wrote:
> > >
> > > > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


Re: hbase async lookup能否保证输出结果有序?

2021-06-17 Thread Jark Wu
可以看下 AsyncWaitOperator 的源码实现。

Best,
Jark

On Tue, 15 Jun 2021 at 18:53, zilong xiao  wrote:

> 想了解下这块如何保证100%有序的呢,感觉异步查询是不是无法保证查询结果的先后,比如网络原因等等。
>
> Jingsong Li  于2021年6月15日周二 下午5:07写道:
>
> > 是有序的。
> >
> > 无序的mode目前并没有支持, 目前可能会影响流计算的正确性
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 15, 2021 at 3:42 PM zilong xiao  wrote:
> >
> > > hi,社区大佬们好,想问下flink 1.13中hbase async lookup能否保证输出结果有序?
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: 邮件退订

2021-06-17 Thread Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Thu, 17 Jun 2021 at 09:29, wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>


Re: 退订

2021-06-17 Thread Jark Wu
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jark

On Tue, 15 Jun 2021 at 23:56, frank.liu  wrote:

> 退订
>
>
> | |
> frank.liu
> |
> |
> frank...@163.com
> |
> 签名由网易邮箱大师定制


Re: Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-17 Thread Jark Wu
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。
可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。
7月的 meetup 上也会分享相关设计和实现,敬请期待。

Best,
Jark

On Thu, 17 Jun 2021 at 09:34, casel.chen  wrote:

> Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
> cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-16 17:27:14,"Leonard Xu"  写道:
> >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
> 可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
> >
> >祝好,
> >Leonard
> >
> >> 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道:
> >>
> >> 相同问题,请问有处理方式吗
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: after upgrade flink1.12 to flink1.13.1, flink web-ui's taskmanager detail page error

2021-06-17 Thread Yangze Guo
Thanks for the report, Yidan.

It will be fixed in FLINK-23024 and hopefully fixed in 1.13.2.

Best,
Yangze Guo

On Fri, Jun 18, 2021 at 10:00 AM yidan zhao  wrote:
>
>  Yeah, I also think it is a bug.
>
> Arvid Heise  于2021年6月17日周四 下午10:13写道:
> >
> > Hi Yidan,
> >
> > could you check if the bucket exist and is accessible? Seems like this 
> > directory cannot be created 
> > bos://flink-bucket/flink/ha/opera_upd_FlinkTestJob3/blob.
> >
> > The second issue looks like a bug. I will create a ticket.
> >
> > On Wed, Jun 16, 2021 at 5:21 AM yidan zhao  wrote:
> >>
> >> does anyone has idea? Here I give another exception stack.
> >>
> >>
> >> Unhandled exception.
> >> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed
> >> to serialize the result for RPC call : requestTaskManagerDetailsInfo.
> >> at 
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >> ~[?:1.8.0_251] at
> >> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
> >> ~[?:1.8.0_251] at
> >> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
> >> ~[?:1.8.0_251] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [flink-dist_2.11-1.13.1.jar:1.13.1] Caused by:
> >> java.io.NotSerializableException:
> >> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots at
> >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> >> ~[?:1.8.0_251] at
> >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> ~[?:1.8.0_251] at
> >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387)
> >> ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 27 more
> >>
> >> yidan zhao  于2021年6月11日周五 下午4:10写道:
> >> >
> >> > I upgrade flink 

Re: java.lang.IllegalStateException: Trying to access closed classloader.

2021-06-17 Thread sheng_bigdata
你好,这个问题你解决了没  我现在也遇到同样的问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: after upgrade flink1.12 to flink1.13.1, flink web-ui's taskmanager detail page error

2021-06-17 Thread yidan zhao
 Yeah, I also think it is a bug.

Arvid Heise  于2021年6月17日周四 下午10:13写道:
>
> Hi Yidan,
>
> could you check if the bucket exist and is accessible? Seems like this 
> directory cannot be created 
> bos://flink-bucket/flink/ha/opera_upd_FlinkTestJob3/blob.
>
> The second issue looks like a bug. I will create a ticket.
>
> On Wed, Jun 16, 2021 at 5:21 AM yidan zhao  wrote:
>>
>> does anyone has idea? Here I give another exception stack.
>>
>>
>> Unhandled exception.
>> org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Failed
>> to serialize the result for RPC call : requestTaskManagerDetailsInfo.
>> at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:404)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$0(AkkaRpcActor.java:360)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>> ~[?:1.8.0_251] at
>> java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848)
>> ~[?:1.8.0_251] at
>> java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168)
>> ~[?:1.8.0_251] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:352)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:319)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.13.1.jar:1.13.1] Caused by:
>> java.io.NotSerializableException:
>> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> ~[?:1.8.0_251] at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> ~[?:1.8.0_251] at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:387)
>> ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 27 more
>>
>> yidan zhao  于2021年6月11日周五 下午4:10写道:
>> >
>> > I upgrade flink from 1.12 to 1.13.1, and the rest api
>> > (http://xxx:8600/#/task-manager/xxx:34575-c53c6c/metrics) failed.
>> > My standalone cluster include 30 Jobmanagers and 30 Taskmanagers, and
>> > I found the api only works in the one jobmanager when it is the rest
>> > api leader.
>> >
>> > for example, jobmanager1(http://jobmanager1:8600/#/...)  and
>> > 

Re: Diagnosing bottlenecks in Flink jobs

2021-06-17 Thread Dan Hill
Thanks Jing!

On Wed, Jun 16, 2021 at 11:30 PM JING ZHANG  wrote:

> Hi Dan,
> It's better to split the Kafka partition into multiple partitions.
> Here is a way to try without splitting the Kafka partition. Add a
> rebalance shuffle between source and the downstream operators, set multiple
> parallelism for the downstream operators. But this way would introduce
> extra cpu cost for serialize/deserialize and extra network cost for shuffle
> data. I'm not sure the benefits of this method can offset the additional
> costs.
>
> Best,
> JING ZHANG
>
> Dan Hill  于2021年6月17日周四 下午1:49写道:
>
>> Thanks, JING ZHANG!
>>
>> I have one subtask for one Kafka source that is getting backpressure.  Is
>> there an easy way to split a single Kafka partition into multiple
>> subtasks?  Or do I need to split the Kafka partition?
>>
>> On Wed, Jun 16, 2021 at 10:29 PM JING ZHANG  wrote:
>>
>>> Hi Dan,
>>> Would you please describe what's the problem about your job? High
>>> latency or low throughput?
>>> Please first check the job throughput and latency .
>>> If the job throughput matches the speed of sources producing data and
>>> the latency metric is good, maybe the job works well without bottlenecks.
>>> If you find unnormal throughput or latency, please try the following
>>> points:
>>> 1. check the back pressure
>>> 2. check whether checkpoint duration is long and whether the checkpoint
>>> size is expected
>>>
>>> Please share the details for deeper analysis in this email if you find
>>> something abnormal about  the job.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Dan Hill  于2021年6月17日周四 下午12:44写道:
>>>
 We have a job that has been running but none of the AWS resource
 metrics for the EKS, EC2, MSK and EBS show any bottlenecks.  I have
 multiple 8 cores allocated but only ~2 cores are used.  Most of the memory
 is not consumed.  MSK does not show much use.  EBS metrics look mostly
 idle.  I assumed I'd be able to see whichever resources is a bottleneck.

 Is there a good way to diagnose where the bottleneck is for a Flink job?

>>>


Flink 提交到yarn失败

2021-06-17 Thread yangpengyi
环境: FLINK 1.12 & CDH6.1.1
问题:
利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用到了正确的客户端jar包。
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
the classpath, or some classes are missing from the classpath.
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:117)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:309)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:272)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:212)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:173)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
~[cloud-flinkAppCrashAnalysis-1.0.0-encodetest-RELEASE.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:172)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 2 more
Caused by: java.lang.VerifyError: Bad return type
Exception Details:
  Location:
   
org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
@157: areturn
  Reason:
Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is
not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature)
  Current Frame:
bci: @157
flags: { }
locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
stack: { 'org/apache/hadoop/fs/ContentSummary' }
  Bytecode:
0x000: 2ab6 00b5 2a13 01f4 2bb6 00b7 4d01 4e2a
0x010: b400 422b b901 f502 003a 042c c600 1d2d
0x020: c600 152c b600 b9a7 0012 3a05 2d19 05b6
0x030: 00bb a700 072c b600 b919 04b0 3a04 1904
0x040: 4e19 04bf 3a06 2cc6 001d 2dc6 0015 2cb6
0x050: 00b9 a700 123a 072d 1907 b600 bba7 0007
0x060: 2cb6 00b9 1906 bf4d 2c07 bd00 d459 0312
0x070: d653 5904 12e0 5359 0512 e153 5906 1301
0x080: f653 b600 d74e 2dc1 01f6 9900 14b2 0023
0x090: 1301 f7b9 002b 0200 2a2b b601 f8b0 2dbf
0x0a0:
  Exception Handler Table:
bci [35, 39] => handler: 42
bci [15, 27] => handler: 60
bci [15, 27] => handler: 68
bci [78, 82] => handler: 85
bci [60, 70] => handler: 68
bci [4, 57] => handler: 103
bci [60, 103] => handler: 103
  Stackmap Table:
   
full_frame(@42,{Object[#751],Object[#774],Object[#829],Object[#799],Object[#1221]},{Object[#799]})
same_frame(@53)
same_frame(@57)
   
full_frame(@60,{Object[#751],Object[#774],Object[#829],Object[#799]},{Object[#799]})
same_locals_1_stack_item_frame(@68,Object[#799])
   
full_frame(@85,{Object[#751],Object[#774],Object[#829],Object[#799],Top,Top,Object[#799]},{Object[#799]})
same_frame(@96)
same_frame(@100)
full_frame(@103,{Object[#751],Object[#774]},{Object[#854]})
append_frame(@158,Object[#854],Object[#814])

at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:167)
~[hadoop-hdfs-client-3.0.0-cdh6.1.1.jar:?]
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:164)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at 

如何通过反射获取connector是否支持lookup?

2021-06-17 Thread casel.chen
获取connector是否支持source和sink只要看 XXXDynamicTableFactory 是否实现 
DynamicTableSourceFactory和DynamicTableSinkFactory接口,但在source情况下如何进一步判断它是否支持lookup呢?
 public DynamicTableSource createDynamicTableSource(Context context) 
这个方法的实际返回类型要如何获取?通过判断该方法的实际返回类型是否支持LookupTableSource就可以了。是不是要通过method.invoke实际调用一下?但传入的参数需要mock比较多的对象。有没有简便一点的方法?

Recommendation for dealing with Job Graph incompatibility across varying Flink versions

2021-06-17 Thread Sonam Mandal
Hello,

We are exploring running multiple Flink clusters within a Kubernetes cluster 
such that each Flink cluster can run with a specified Flink image version. 
Since the Flink Job Graph needs to be compatible with the Flink version running 
in the Flink cluster, this brings a challenge in how we ensure that the SQL job 
graph or Flink job jars are compatible with the Flink cluster users want to run 
them on.

E.g. if the Flink cluster is running version 1.12.1, the job graph generated 
from the SQL must be created using compatible 1.12.1 Flink libraries. 
Otherwise, we see issues with deserialization etc.

Is there a recommended way to handle this scenario today?

Thanks,
Sonam


Re: EnvironmentInformation class logs secrets passed as JVM/CLI arguments

2021-06-17 Thread Jose Vargas
Hi Arvid,

I see what you mean; no solution in Flink will be able to account for the
different variations in which applications may want to pass in parameters
or the external processes or events that introspect wherever the Flink
process happens to run. I do think there is an opportunity to prevent
logging secrets by focusing on a couple of areas. The reason I think we
should improve where we can is because logs can end up in systems that a
greater number of people have access to. For example, in a given
environment, perhaps only automated systems have the ability to deploy and
instropect the servers, but engineers across teams may have access to all
logs from that environment.

The areas where I think we can prevent logging secrets are:
1) Obfuscating JVM parameters
and
2) Apply the logic in ParameterTool's "fromArgs" method to parse out
arguments in the EnvironmentInformation class.

For example, one of the documented ways of passing in AWS credentials are
via JVM parameters,
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
By leveraging ParameterTool's logic in the EnvironmentInformation class, we
can bridge the intent of the current code with how Flink's built-in
argument parser works.

On Thu, Jun 17, 2021 at 2:31 PM Arvid Heise  wrote:

> Hi Jose,
>
> Masking secrets is a recurring topic where ultimately you won't find a
> good solution. Your secret might for example appear in a crash dump or on
> some process monitoring application. To mask reliably you'd either need
> specific application knowledge (every user supplies arguments differently)
> or disable logging of parameters completely.
>
> Frankly speaking, I have never seen passwords being passed over CLI being
> really secure. The industry practice is to either use a sidecar approach or
> fetch secrets file-based (e.g., docker mounts). Even using ENV is
> discouraged.
>
> On Wed, Jun 16, 2021 at 11:28 PM Jose Vargas 
> wrote:
>
>> Hi,
>>
>> I am using Flink 1.13.1 and I noticed that the logs coming from the
>> EnvironmentInformation class,
>> https://github.com/apache/flink/blob/release-1.13.1/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java#L444-L467,
>> log the value of secrets that are passed in as JVM and CLI arguments. For
>> the JVM arguments, both the secret key and value are logged. For the CLI
>> arguments, the secret key is obfuscated, but the actual value of the secret
>> is not. This also affects Flink 1.12.
>>
>> For example, with CLI arguments like "--my-password VALUE_TO_HIDE", the
>> jobmanager will log the following (assuming cluster is in application mode)
>>
>> jobmanager | ** (sensitive information)
>> jobmanager | VALUE_TO_HIDE
>>
>> The key is obfuscated but the actual value isn't. This means that secret
>> values can end up in central logging systems. Passing in the CLI argument
>> as "--my-password*=*VALUE_TO_HIDE" hides the entire string but makes the
>> value unusable and is different from how the docs mentions job arguments
>> should be passed in [1].
>>
>> I saw that there was a ticket to obfuscate secrets [2], but that seems to
>> only apply to the UI, not for the configuration logs. Turning off, or
>> otherwise disabling logs from the appropriate logger is one solution, but
>> it seems to me that the logger that a user would need to turn off is
>> dependent on how the Flink cluster is running (standalone, k8s, yarn,
>> mesos, etc). Furthermore, it can be useful to see these configuration logs.
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/application_parameters/#from-the-command-line-arguments
>> [2] https://issues.apache.org/jira/browse/FLINK-14047
>>
>> Thanks,
>> --
>>
>> Jose Vargas
>>
>> Software Engineer, Data Engineering
>>
>> E: jose.var...@fiscalnote.com
>>
>> fiscalnote.com   |  info.cq.com
>>   | rollcall.com 
>>
>>

-- 

Jose Vargas

Software Engineer, Data Engineering

E: jose.var...@fiscalnote.com

fiscalnote.com   |  info.cq.com
  | rollcall.com 


Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
Thanks for sharing,

I think the problem is that restartsState is never updated:
- on the first attempt, context.isRestored() returns false (and "never
restored" is logged)
- on subsequent attempts, it again returns false, because the state
was never updated before

Adding
if (!context.isRestored()) { restartsState.add(0L); }
should solve the problem
(it's also better to use state.update instead of state.add if only max
is needed).

Regards,
Roman

On Thu, Jun 17, 2021 at 5:35 PM Felipe Gutierrez
 wrote:
>
> Sure, here it is. Nothing is mocked. I double-checked.
>
> UnitTestClass {.
> protected static LocalFlinkMiniCluster flink;
>
> @BeforeClass
> public static void prepare() {
> flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
> flink.start();
>
> TestStreamEnvironment.setAsContext(flink, PARALLELISM);
> }
>
> private static Configuration getFlinkConfiguration() {
> Configuration flinkConfig = new Configuration();
> flinkConfig.setInteger("local.number-taskmanager", 1);
> flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
> flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
> flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
> try {
> flinkConfig.setString("state.checkpoints.dir", "file://" + 
> tempFolder.newFolder().getAbsolutePath());
> } catch (IOException e) {
> throw new RuntimeException("error in flink cluster config", e);
> }
> return flinkConfig;
> }
>
>
> The class that I check if the job was restarted:
>
> public class ExceptionSimulatorProcessFunction extends 
> ProcessFunction
> implements CheckpointedFunction {
>
> final OutputTag outputTag = new OutputTag("side-output") {
> };
> private transient ListState restartsState;
> private Long restartsLocal;
> ...
> @Override
> public void processElement(Object value, Context ctx, Collector 
> out) throws Exception {
> this.currentTimeMillis = System.currentTimeMillis() - 
> currentTimeMillisBehind;
>
> // If current time is less than the reference time ahead AND we have 
> the poison auction an exception will throw
> if (this.currentTimeMillis < this.referenceTimeMillisAhead && 
> POISON__TRANSACTION_ID.equals(value.toString())) {
>
> LOG.error("This exception will trigger until the reference time 
> [{}] reaches the trigger time [{}]",
> sdfMillis.format(new Date(this.currentTimeMillis)),
> sdfMillis.format(new 
> Date(this.referenceTimeMillisAhead)));
>
> throw new SimulatedException("Transaction ID: " + 
> value.toString() +
> " not allowed. This is a simple exception for testing 
> purposes.");
> }
> out.collect(value);
>
>
> // counts the restarts
> if (restartsState != null) {
> List restoreList = Lists.newArrayList(restartsState.get());
> Long attemptsRestart = 0L;
> if (restoreList != null && !restoreList.isEmpty()) {
> attemptsRestart = Collections.max(restoreList);
> if (restartsLocal < attemptsRestart) {
> restartsLocal = attemptsRestart;
> ctx.output(outputTag, Long.valueOf(attemptsRestart));
> }
> }
> LOG.info("Attempts restart: " + attemptsRestart);
> }
> }
>
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws 
> Exception {}
>
> @Override
> public void initializeState(FunctionInitializationContext context) throws 
> Exception {
> restartsState = context.getOperatorStateStore().getListState(new 
> ListStateDescriptor("restarts", Long.class));
>
> if (context.isRestored()) {
> List restoreList = Lists.newArrayList(restartsState.get());
> if (restoreList == null || restoreList.isEmpty()) {
> restartsState.add(1L);
> LOG.info("restarts: 1");
> } else {
> Long max = Collections.max(restoreList);
> LOG.info("restarts: " + max);
> restartsState.add(max + 1);
> }
> } else {
> LOG.info("restarts: never restored");
> }
> }
> }
>
>
>
>
>
>
>
>
> On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Could you please share the test code?
>>
>> I think the returned value might depend on the level on which the
>> tests are executed. If it's a regular job then it should return the
>> correct value (as with cluster). If the environment in which the code
>> is executed is mocked then it can be false.
>>
>> Regards,
>> Roman
>>
>> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>>  wrote:
>> >
>> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
>> > cluster and it returns true when the application recovers. However, in 
>> > 

Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
Hi,

I am using application mode.

Thanks,
Qihua

On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:

> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I want is flink app has multiple jobs,
>> each job manage a stream. Currently our flink app has only 1 job that
>> manage multiple streams.
>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>> when the second executeAsync() was called, it shows " *Job
>>  was recovered successfully.*"
>> Looks like the second executeAsync() recover the first job. Not start a
>> second job.
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> env.execute("Job 1"); is a blocking call. You either have to use
>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>> finishes then this would also work by having sequential execution.
>>>
>>> However, I think what you actually want to do is to use the same env
>>> with 2 topologies and 1 single execute like this.
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream1 = env.addSource(new SourceFunction());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> DataStream stream2 = env.addSource(new SourceFunction());
>>> stream2.addSink(new DiscardingSink<>());
>>> env.execute("Job 1+2");
>>>
>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>>
 Hi,
 Does anyone know how to run multiple jobs in same flink application?
 I did a simple test.  First job was started. I did see the log message,
 but I didn't see the second job was started, even I saw the log message.

 public void testJobs() throws Exception {
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream stream1 = env.addSource(new
 SourceFunction());
 stream1.addSink(new FlinkKafkaProducer());
 printf("### first job");
 env.execute("Job 1");

 env = StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream stream2 = env.addSource(new
 SourceFunction());
 stream2.addSink(new DiscardingSink<>());
 printf("### second job");
 env.execute("Job 2");
 }

 Here is the log:
 ### first job
 INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
  is submitted.
 INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
 Submitting Job with JobId=.
 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
 Received JobGraph submission  (job1).
 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
 Submitting job  (job1).

 INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
 Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
 INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
 execution of job job1 () under job master
 id b03cde9dc2aebdb39c46cda4c2a94c07.
 INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
 scheduling with scheduling strategy
 [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
 job1 () switched from state CREATED to
 RUNNING.

 ### second job
 WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter
 : ### second job
 INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
 ResourceManager address, beginning registration
 INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
 Starting ZooKeeperLeaderRetrievalService
 /leader//job_manager_lock.
 INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
 Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
 flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
 .
 INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
  was recovered successfully.
 INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
 Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
 flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
 .
 INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
 successfully registered at ResourceManager, leader id:
 956d4431ca90d45d92c027046cd0404e.
 INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
 Requesting new slot 

Re: multiple jobs in same flink app

2021-06-17 Thread Arvid Heise
Hi Qihua,

Which execution mode are you using?

On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:

> Hi,
>
> Thank you for your reply. What I want is flink app has multiple jobs, each
> job manage a stream. Currently our flink app has only 1 job that manage
> multiple streams.
> I did try env.executeAsyc(), but it still doesn't work. From the log, when
> the second executeAsync() was called, it shows " *Job
>  was recovered successfully.*"
> Looks like the second executeAsync() recover the first job. Not start a
> second job.
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>
>> Hi,
>>
>> env.execute("Job 1"); is a blocking call. You either have to use
>> executeAsync or use a separate thread to submit the second job. If Job 1
>> finishes then this would also work by having sequential execution.
>>
>> However, I think what you actually want to do is to use the same env with
>> 2 topologies and 1 single execute like this.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> DataStream stream2 = env.addSource(new SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> env.execute("Job 1+2");
>>
>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>>
>>> Hi,
>>> Does anyone know how to run multiple jobs in same flink application?
>>> I did a simple test.  First job was started. I did see the log message,
>>> but I didn't see the second job was started, even I saw the log message.
>>>
>>> public void testJobs() throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream1 = env.addSource(new SourceFunction());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> printf("### first job");
>>> env.execute("Job 1");
>>>
>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream stream2 = env.addSource(new SourceFunction());
>>> stream2.addSink(new DiscardingSink<>());
>>> printf("### second job");
>>> env.execute("Job 2");
>>> }
>>>
>>> Here is the log:
>>> ### first job
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>  is submitted.
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>> Submitting Job with JobId=.
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Received JobGraph submission  (job1).
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Submitting job  (job1).
>>>
>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
>>> of job job1 () under job master id
>>> b03cde9dc2aebdb39c46cda4c2a94c07.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>> scheduling with scheduling strategy
>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
>>> () switched from state CREATED to RUNNING.
>>>
>>> ### second job
>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter
>>> : ### second job
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>> ResourceManager address, beginning registration
>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Starting ZooKeeperLeaderRetrievalService
>>> /leader//job_manager_lock.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>> .
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>  was recovered successfully.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>> .
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>> successfully registered at ResourceManager, leader id:
>>> 956d4431ca90d45d92c027046cd0404e.
>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>> 

Re: EnvironmentInformation class logs secrets passed as JVM/CLI arguments

2021-06-17 Thread Arvid Heise
Hi Jose,

Masking secrets is a recurring topic where ultimately you won't find a good
solution. Your secret might for example appear in a crash dump or on some
process monitoring application. To mask reliably you'd either need specific
application knowledge (every user supplies arguments differently) or
disable logging of parameters completely.

Frankly speaking, I have never seen passwords being passed over CLI being
really secure. The industry practice is to either use a sidecar approach or
fetch secrets file-based (e.g., docker mounts). Even using ENV is
discouraged.

On Wed, Jun 16, 2021 at 11:28 PM Jose Vargas 
wrote:

> Hi,
>
> I am using Flink 1.13.1 and I noticed that the logs coming from the
> EnvironmentInformation class,
> https://github.com/apache/flink/blob/release-1.13.1/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java#L444-L467,
> log the value of secrets that are passed in as JVM and CLI arguments. For
> the JVM arguments, both the secret key and value are logged. For the CLI
> arguments, the secret key is obfuscated, but the actual value of the secret
> is not. This also affects Flink 1.12.
>
> For example, with CLI arguments like "--my-password VALUE_TO_HIDE", the
> jobmanager will log the following (assuming cluster is in application mode)
>
> jobmanager | ** (sensitive information)
> jobmanager | VALUE_TO_HIDE
>
> The key is obfuscated but the actual value isn't. This means that secret
> values can end up in central logging systems. Passing in the CLI argument
> as "--my-password*=*VALUE_TO_HIDE" hides the entire string but makes the
> value unusable and is different from how the docs mentions job arguments
> should be passed in [1].
>
> I saw that there was a ticket to obfuscate secrets [2], but that seems to
> only apply to the UI, not for the configuration logs. Turning off, or
> otherwise disabling logs from the appropriate logger is one solution, but
> it seems to me that the logger that a user would need to turn off is
> dependent on how the Flink cluster is running (standalone, k8s, yarn,
> mesos, etc). Furthermore, it can be useful to see these configuration logs.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/application_parameters/#from-the-command-line-arguments
> [2] https://issues.apache.org/jira/browse/FLINK-14047
>
> Thanks,
> --
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnote.com   |  info.cq.com
>   | rollcall.com 
>
>


Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
Hi,

Thank you for your reply. What I want is flink app has multiple jobs, each
job manage a stream. Currently our flink app has only 1 job that manage
multiple streams.
I did try env.executeAsyc(), but it still doesn't work. From the log, when
the second executeAsync() was called, it shows " *Job
 was recovered successfully.*"
Looks like the second executeAsync() recover the first job. Not start a
second job.

Thanks,
Qihua


On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:

> Hi,
>
> env.execute("Job 1"); is a blocking call. You either have to use
> executeAsync or use a separate thread to submit the second job. If Job 1
> finishes then this would also work by having sequential execution.
>
> However, I think what you actually want to do is to use the same env with
> 2 topologies and 1 single execute like this.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> DataStream stream2 = env.addSource(new SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink application?
>> I did a simple test.  First job was started. I did see the log message,
>> but I didn't see the second job was started, even I saw the log message.
>>
>> public void testJobs() throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> printf("### first job");
>> env.execute("Job 1");
>>
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream2 = env.addSource(new SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> printf("### second job");
>> env.execute("Job 2");
>> }
>>
>> Here is the log:
>> ### first job
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>  is submitted.
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Submitting Job with JobId=.
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Received JobGraph submission  (job1).
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Submitting job  (job1).
>>
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
>> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
>> of job job1 () under job master id
>> b03cde9dc2aebdb39c46cda4c2a94c07.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling
>> with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
>> () switched from state CREATED to RUNNING.
>>
>> ### second job
>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter :
>> ### second job
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>> ResourceManager address, beginning registration
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
>> ZooKeeperLeaderRetrievalService
>> /leader//job_manager_lock.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>  was recovered successfully.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>> successfully registered at ResourceManager, leader id:
>> 956d4431ca90d45d92c027046cd0404e.
>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>> profile ResourceProfile{UNKNOWN} from resource manager.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>  with allocation id
>> 21134414fc60d4ef3e940609cef960b6.
>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and

Re: Please advise bootstrapping large state

2021-06-17 Thread Marco Villalobos
I need to bootstrap a keyed process function.

So, I was hoping to use the Table SQL API because I thought it could
parallelize the work more efficiently via partitioning.
I need to boot strap keyed state for a keyed process function, with
Flnk 1.12.1, thus I think I am required to use the DataSet API.

Is my only option JdbcInputFormat?

ExecutionEnvironment batchEnv =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv =
BatchTableEnvironment.create(batchEnv);
batchTableEnv.executeSql("
CREATE TABLE my_table (

) WITH (
   'connector.type' = 'jdbc',
   'connector.url' = '?',
   'connector.username' = '?',
   'connector.password' = '?',
   'connector.table' = 'my_table'
)");

Table table = batchTableEnv.sqlQuery("SELECT name, step FROM my_table");
DataSet rowDataSet = batchTableEnv.toDataSet(table, Row.class);
rowDataSet.print();

This ends up throwing this exception:

org.apache.flink.table.api.TableException: Only BatchTableSource and
InputFormatTableSource are supported in BatchTableEnvironment.
at
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)

On Thu, Jun 17, 2021 at 12:51 AM Timo Walther  wrote:

> Hi Marco,
>
> which operations do you want to execute in the bootstrap pipeline?
>
> Maybe you don't need to use SQL and old planner. At least this would
> simplify the friction by going through another API layer.
>
> The JDBC connector can be directly be used in DataSet API as well.
>
> Regards,
> Timo
>
>
>
> On 17.06.21 07:33, Marco Villalobos wrote:
> > Thank you very much!
> >
> > I tried using Flink's SQL JDBC connector, and ran into issues.
> > According to the flink documentation, only the old planner is compatible
> > with the DataSet API.
> >
> > When I connect to the table:
> >
> > CREATE TABLE my_table (
> > 
> > ) WITH (
> > 'connector.type' = 'jdbc',
> > 'connector.url' = '?',
> > 'connector.username' = '?',
> > 'connector.password' = '?',
> > 'connector.table' = 'my_table'
> > )
> >
> > It creates a JdbcTableSource, but only BatchTableSource and
> > InputFormatTableSource are supported in BatchTableEnvironment.
> >
> > By the way, it was very challenging to figure out how to create that
> > connection string, because its a different format than what is in the
> > documentation. I had to comb through JdbcTableSourceSinkFactory to
> > figure out how to connect.
> >
> > Is it even possible to use the DataSet API with the Table SQL api in
> > Flink 1.12.1?
> >
> >
> > On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger  > > wrote:
> >
> > Hi Marco,
> >
> > The DataSet API will not run out of memory, as it spills to disk if
> > the data doesn't fit anymore.
> > Load is distributed by partitioning data.
> >
> > Giving you advice depends a bit on the use-case. I would explore two
> > major options:
> > a) reading the data from postgres using Flink's SQL JDBC connector
> > [1]. 200 GB is not much data. A 1gb network link needs ~30 minutes
> > to transfer that (125 megabytes / second)
> > b) Using the DataSet API and state processor API. I would first try
> > to see how much effort it is to read the data using the DataSet API
> > (could be less convenient than the Flink SQL JDBC connector).
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/
> >
> >
> >
> > On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
> > mailto:mvillalo...@kineteque.com>>
> wrote:
> >
> > I must bootstrap state from postgres (approximately 200 GB of
> > data) and I notice that the state processor API requires the
> > DataSet API in order to bootstrap state for the Stream API.
> >
> > I wish there was a way to use the SQL API and use a partitioned
> > scan, but I don't know if that is even possible with the DataSet
> > API.
> >
> > I never used the DataSet API, and I am unsure how it manages
> > memory, or distributes load, when handling large state.
> >
> > Would it run out of memory if I map data from a JDBCInputFormat
> > into a large DataSet and then use that to bootstrap state for my
> > stream job?
> >
> > Any advice on how I should proceed with this would be greatly
> > appreciated.
> >
> > Thank you.
> >
>
>


Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Felipe Gutierrez
Sure, here it is. Nothing is mocked. I double-checked.

UnitTestClass {.
protected static LocalFlinkMiniCluster flink;

@BeforeClass
public static void prepare() {
flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false);
flink.start();

TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}

private static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setInteger("local.number-taskmanager", 1);
flinkConfig.setInteger("taskmanager.numberOfTaskSlots", 8);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString("restart-strategy.fixed-delay.delay", "0 s");
try {
flinkConfig.setString("state.checkpoints.dir", "file://" +
tempFolder.newFolder().getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("error in flink cluster config", e);
}
return flinkConfig;
}


The class that I check if the job was restarted:

public class ExceptionSimulatorProcessFunction extends
ProcessFunction
implements CheckpointedFunction {

final OutputTag outputTag = new OutputTag("side-output") {
};
private transient ListState restartsState;
private Long restartsLocal;
...
@Override
public void processElement(Object value, Context ctx, Collector
out) throws Exception {
this.currentTimeMillis = System.currentTimeMillis() -
currentTimeMillisBehind;

// If current time is less than the reference time ahead AND we
have the poison auction an exception will throw
if (this.currentTimeMillis < this.referenceTimeMillisAhead &&
POISON__TRANSACTION_ID.equals(value.toString())) {

LOG.error("This exception will trigger until the reference time
[{}] reaches the trigger time [{}]",
sdfMillis.format(new Date(this.currentTimeMillis)),
sdfMillis.format(new
Date(this.referenceTimeMillisAhead)));

throw new SimulatedException("Transaction ID: " +
value.toString() +
" not allowed. This is a simple exception for testing
purposes.");
}
out.collect(value);


// counts the restarts
if (restartsState != null) {
List restoreList =
Lists.newArrayList(restartsState.get());
Long attemptsRestart = 0L;
if (restoreList != null && !restoreList.isEmpty()) {
attemptsRestart = Collections.max(restoreList);
if (restartsLocal < attemptsRestart) {
restartsLocal = attemptsRestart;
ctx.output(outputTag, Long.valueOf(attemptsRestart));
}
}
LOG.info("Attempts restart: " + attemptsRestart);
}
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {}

@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
restartsState = context.getOperatorStateStore().getListState(new
ListStateDescriptor("restarts", Long.class));

if (context.isRestored()) {
List restoreList =
Lists.newArrayList(restartsState.get());
if (restoreList == null || restoreList.isEmpty()) {
restartsState.add(1L);
LOG.info("restarts: 1");
} else {
Long max = Collections.max(restoreList);
LOG.info("restarts: " + max);
restartsState.add(max + 1);
}
} else {
LOG.info("restarts: never restored");
}
}
}








On Thu, Jun 17, 2021 at 5:20 PM Roman Khachatryan  wrote:

> Hi,
>
> Could you please share the test code?
>
> I think the returned value might depend on the level on which the
> tests are executed. If it's a regular job then it should return the
> correct value (as with cluster). If the environment in which the code
> is executed is mocked then it can be false.
>
> Regards,
> Roman
>
> On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
>  wrote:
> >
> > Yes, I have state on the ProcessFunction. I tested it on a stand-alone
> cluster and it returns true when the application recovers. However, in
> integration tests it does not returns true. I am using Flink 1.4. Do you
> know where it is saying at Flink release 1.13 (
> https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I
> cannot see `isRestored()` equals true on integration tests?
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> >
> >
> > On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:
> >>
> >> Does your ProcessFunction has state? If not it would be in line with
> the documentation.
> >>
> >> Also which Flink version are you using? Before Flink 1.13 empty state
> was omitted so I could imagine that `isRestored()` would return false but
> it should actually now also return true for empty state.
> >>
> >> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <

Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Roman Khachatryan
Hi,

Could you please share the test code?

I think the returned value might depend on the level on which the
tests are executed. If it's a regular job then it should return the
correct value (as with cluster). If the environment in which the code
is executed is mocked then it can be false.

Regards,
Roman

On Thu, Jun 17, 2021 at 4:15 PM Felipe Gutierrez
 wrote:
>
> Yes, I have state on the ProcessFunction. I tested it on a stand-alone 
> cluster and it returns true when the application recovers. However, in 
> integration tests it does not returns true. I am using Flink 1.4. Do you know 
> where it is saying at Flink release 1.13 
> (https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot 
> see `isRestored()` equals true on integration tests?
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
>
>
> On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:
>>
>> Does your ProcessFunction has state? If not it would be in line with the 
>> documentation.
>>
>> Also which Flink version are you using? Before Flink 1.13 empty state was 
>> omitted so I could imagine that `isRestored()` would return false but it 
>> should actually now also return true for empty state.
>>
>> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez 
>>  wrote:
>>>
>>> So, I was trying to improve by using the CheckpointedFunction as it shows 
>>> here [1]. But the method isRestored() says in its documentation [2]:
>>>
>>> "Returns true, if state was restored from the snapshot of a previous 
>>> execution. This returns always false for stateless tasks."
>>>
>>> It is weird because I am extending a ProcessFunction which is a 
>>> RichFunction.
>>>
>>> public class AuctionExceptionSimulatorProcessFunction extends 
>>> ProcessFunction
>>> implements CheckpointedFunction {
>>> ...
>>>
>>> In the end, I cannot rely on the "isRestored()". Do you know what could be 
>>> wrong? I used the same implementation method of [1].
>>>
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>>
>>>
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>>
>>>
>>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan  wrote:

 You can also use accumulators [1] to collect the number of restarts
 (and then access it via client); but side outputs should work as well.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters

 Regards,
 Roman

 On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
  wrote:
 >
 > I just realised that only the ProcessFunction is enough. I don't need 
 > the CheckpointFunction.
 >
 >
 > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, 
 >  wrote:
 >>
 >> Cool!
 >>
 >> I did using this example 
 >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
 >>  because I don't have a keyed stream on the specific operator that I 
 >> want to count the number of restarts. (yes I am using version 1.4 
 >> unfortunately).
 >>
 >> Because I need to test it in an integration test I am using a side 
 >> output 
 >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
 >>  to attach a sink. I am not sure if you have a better idea to test the 
 >> restarts on an integration test. If you have a simple idea please tell 
 >> me :). This was the way that I solved
 >>
 >> Thanks
 >> Felipe
 >>
 >> --
 >> -- Felipe Gutierrez
 >> -- skype: felipe.o.gutierrez
 >>
 >>
 >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan  
 >> wrote:
 >>>
 >>> Hi Felipe,
 >>>
 >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
 >>> that depending on the configuration only a pipeline region can be
 >>> restarted, not the whole job).
 >>>
 >>> But if all you want is to check whether it's a first attempt or not,
 >>> you can also call context.isRestored() from initializeState() [2]
 >>>
 >>> [1]
 >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
 >>>
 >>> [2]
 >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
 >>>
 >>> Regards,
 >>> Roman
 >>>
 >>>
 >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
 >>>  wrote:
 >>> >
 >>> > Hello community,
 >>> >
 >>> > Is it possible to know programmatically how many times my Flink 
 >>> > stream job restarted 

Re: How to use onTimer() on event stream for *ProcessFunction?

2021-06-17 Thread Felipe Gutierrez
I didn't know that I don't need to implement CheckpointedFunction if I use
ListState. However, I considered this answer (
https://stackoverflow.com/a/47443537/2096986) where Fabian says:

"You can store parts of the operator state also in the ListState (instead
of holding it on the heap) but it will be quite expensive to access
individual elements because you have to traverse an iterator."

So, maybe implementing the CheckpointedFunction and save on snapshotState()
is still better? Maybe not? What do you think?

Would you please consider shedding a light on this question related to
CoProcessFunction and event time trigger after a job failure? (
https://lists.apache.org/x/thread.html/r5f74099a7b91b4ad47ac7612631f7e03d08c0e1d374487da55aa1a31@%3Cuser.flink.apache.org%3E
)

Thank you very much!
Felipe



On Thu, Jun 17, 2021 at 4:38 PM Arvid Heise  wrote:

> Let's start in reverse: you don't need to implement CheckpointedFunction
> if you use managed state (ListState is managed).
>
> Now to the question of how you should implement onTimer. That's up to you
> and heavily depends on your use case.
> The first onTimer implementation is called 60s after an element of key X
> has been processed. If you have additional elements with key X, you get
> additional timers. However, in this example, only the latest timer should
> actually output data (think of some session detection). That's why the
> implementation checks if it is indeed the last timer or not before
> outputting elements.
>
> The other implementation always outputs elements independent of additional
> timers/elements being added.
>
> On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi community,
>>
>> I don't understand why that KeyedProcessFunction.onTimer() is implemented
>> here [1] is different from here [2]. Both are KeyedProcessFunction and they
>> aim to fire a window on event time. At [1] the events are emitted at if
>> (timestamp == result.lastModified + 6) and the time is scheduled from
>> the ctx.timestamp().
>>
>> public void processElement(). {
>> current.lastModified = ctx.timestamp();
>> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
>> }
>> public void onTimer() {
>> // get the state for the key that scheduled the timer
>> CountWithTimestamp result = state.value();
>> // check if this is an outdated timer or the latest timer
>> if (timestamp == result.lastModified + 6) {
>> // emit the state on timeout
>> out.collect(new Tuple2(result.key,
>> result.count));
>> }
>>
>> At [2] there is no comparison of time on the onTimer() method. Plus the
>> events are scheduled using a formula (eventTime - (eventTime %
>> durationMsec) + durationMsec - 1) and only if they are not late of the
>> watermark (eventTime <= timerService.currentWatermark()).
>>
>> public void processElement(). {
>> if (eventTime <= timerService.currentWatermark()) {
>> // This event is late; its window has already been triggered.
>> } else {
>> // Round up eventTime to the end of the window containing this
>> event.
>> long endOfWindow = (eventTime - (eventTime % durationMsec) +
>> durationMsec - 1);
>> }
>> public void onTimer() {
>> Float sumOfTips = this.sumOfTips.get(timestamp);
>>
>>
>> My use case uses a CoProcessFunction and I am saving the states
>> on ListState. It works fine with the approach [1]. When I used the approach
>> [2] some of the events are late because of the watermark.
>>
>> What is the correct to be used? Or what is the best?
>>
>> Afterwards I have to make this function fault tolerant. So, my next
>> question is. Do I have to implement CheckpointedFunction and
>> CheckpointedRestoring. Or because I am using ListState it already recovers
>> from failures?
>>
>> Thanks,
>> Felipe
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method
>>
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>


Flink 提交 job 后 task 始终是schedule 状态

2021-06-17 Thread Lei Wang
用 standone 方式在一台机器上启动,提交job 后

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate the required slot within slot request timeout. Please
make sure that the cluster has enough resources.

slots 是充足的。
我用的是 flink-1.11.2 ,我看了下跟 https://issues.apache.org/jira/browse/FLINK-19237
类似,但是我看不懂是什么意思。

但奇怪的是,我在其他服务器上做相同的操作就没有这个问题。有大神给解释下吗?


谢谢,
王磊


Re: Corrupted unaligned checkpoints in Flink 1.11.1

2021-06-17 Thread Alexander Filipchik
Did some more digging.
1) is not an option as we are not doing any cleanups at the moment. We keep
the last 4 checkpoints per job + all the savepoints.
2) I looked at job deployments that happened 1 week before the incident. We
have 23 deployments in total and each resulted in a unique job id. I also
looked at job specific metrics and I don't see any evidence of overlapping
checkpointing. There is exactly 1 checkpoint per application, every time it
has a different job id and every time once a new job checkpoints there are
now checkpoints from previous job id.

A bit of a mystery. Is there a way to at least catch it in the future? Any
additional telemetry (logs, metrics) we can extract to better understand
what is happening.

Alex

On Tue, Jun 8, 2021 at 12:01 AM Piotr Nowojski  wrote:

> Re-adding user mailing list
>
> Hey Alex,
>
> In that case I can see two scenarios that could lead to missing files.
> Keep in mind that incremental checkpoints are referencing previous
> checkpoints in order to minimise the size of the checkpoint (roughly
> speaking only changes since the previous checkpoint are being
> persisted/uploaded/written). Checkpoint number 42, can reference an
> arbitrary number of previous checkpoints. I suspect that somehow, some of
> those previously referenced checkpoints got deleted and removed. Also keep
> in mind that savepoints (as of now) are never incremental, they are always
> full checkpoints. However externalised checkpoints can be incremental. Back
> to the scenarios:
> 1. You might have accidentally removed some older checkpoints from your
> Job2, maybe thinking they are no longer needed. Maybe you have just kept
> this single externalised checkpoint directory from steps T3 or T4,
> disregarding that this externalised checkpoint might be referencing
> previous checkpoints of Job2?
> 2. As I mentioned, Flink is automatically maintaining reference counts of
> the used files and deletes them when they are no longer used/referenced.
> However this works only within a single job/cluster. For example if between
> steps T3 and T4, you restarted Job2 and let it run for a bit, it could take
> more checkpoints that would subsume files that were still part of the
> externalised checkpoint that you previously used to start Job3/Job4. Job2
> would have no idea that Job3/Job4 exist, let alone that they are
> referencing some files from Job2, and those files could have been deleted
> as soon as Job2 was no longer using/referencing them.
>
> Could one of those happen in your case?
>
> Best, Piotrek
>
> pon., 7 cze 2021 o 20:01 Alexander Filipchik 
> napisał(a):
>
>> Yes, we do use incremental checkpoints.
>>
>> Alex
>>
>> On Mon, Jun 7, 2021 at 3:12 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Alex,
>>>
>>> A quick question. Are you using incremental checkpoints?
>>>
>>> Best, Piotrek
>>>
>>> sob., 5 cze 2021 o 21:23  napisał(a):
>>>
 Small correction, in T4 and T5 I mean Job2, not Job 1 (as job 1 was
 save pointed).

 Thank you,
 Alex

 On Jun 4, 2021, at 3:07 PM, Alexander Filipchik 
 wrote:

 
 Looked through the logs and didn't see anything fishy that indicated an
 exception during checkpointing.
 To make it clearer, here is the timeline (we use unaligned checkpoints,
 and state size around 300Gb):

 T1: Job1 was running
 T2: Job1 was savepointed, brought down and replaced with Job2.
 T3: Attempts to savepoint Job2 failed (timed out). Job2 was cancelled,
 brought down and replaced by Job3 that was restored from extarnilized
 checkpoint of Job2
 T3: Attempts to savepoint Job3 failed (timed out). Job3 was cancelled,
 brought down and replaced by Job4 that was restored from extarnilized
 checkpoint of Job3
 T4: We realized that jobs were timing out to savepoint due to local
 disk throttling. We provisioned disk with more throughput and IO. Job4 was
 cancelled, Job4 was deployed and restored from externilized checkpoint of
 Job3, but failed as it couldn't find some files in the folder that belongs
 to the checkpoint of *Job1*
 T5: We tried to redeploy and restore from checkpoints of Job3 and Job2,
 but all the attempts failed on reading files from the *folder that
 belongs to the checkpoint of Job1*

 We checked the content of the folder containing checkpoints of Job1,
 and it has files. Not sure what is pointing tho missing files and what
 could've removed them.

 Any way we can figure out what could've happened? Is there a tool that
 can read the checkpoint and check whether it is valid?

 Alex

 On Thu, Jun 3, 2021 at 2:12 PM Alexander Filipchik <
 afilipc...@gmail.com> wrote:

> On the checkpoints -> what kind of issues should I check for? I was
> looking for metrics and it looks like they were reporting successful
> checkpoints. It looks like some files were removed in the shared folder,
> but I'm not sure 

Re: multiple jobs in same flink app

2021-06-17 Thread Arvid Heise
Hi,

env.execute("Job 1"); is a blocking call. You either have to use
executeAsync or use a separate thread to submit the second job. If Job 1
finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2
topologies and 1 single execute like this.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream stream1 = env.addSource(new SourceFunction());
stream1.addSink(new FlinkKafkaProducer());
DataStream stream2 = env.addSource(new SourceFunction());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:

> Hi,
> Does anyone know how to run multiple jobs in same flink application?
> I did a simple test.  First job was started. I did see the log message,
> but I didn't see the second job was started, even I saw the log message.
>
> public void testJobs() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> printf("### first job");
> env.execute("Job 1");
>
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream2 = env.addSource(new SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> printf("### second job");
> env.execute("Job 2");
> }
>
> Here is the log:
> ### first job
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>  is submitted.
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
> Submitting Job with JobId=.
> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> JobGraph submission  (job1).
> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> Submitting job  (job1).
>
> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
> of job job1 () under job master id
> b03cde9dc2aebdb39c46cda4c2a94c07.
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling
> with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
> () switched from state CREATED to RUNNING.
>
> ### second job
> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter :
> ### second job
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
> ResourceManager address, beginning registration
> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
> ZooKeeperLeaderRetrievalService
> /leader//job_manager_lock.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
> .
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>  was recovered successfully.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
> .
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
> successfully registered at ResourceManager, leader id:
> 956d4431ca90d45d92c027046cd0404e.
> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Request slot with profile ResourceProfile{UNKNOWN} for job
>  with allocation id
> 21134414fc60d4ef3e940609cef960b6.
> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Request slot with profile ResourceProfile{UNKNOWN} for job
>  with allocation id
> 650bd9100a35ef5086fd4614f5253b55.
>


Re: How to use onTimer() on event stream for *ProcessFunction?

2021-06-17 Thread Arvid Heise
Let's start in reverse: you don't need to implement CheckpointedFunction if
you use managed state (ListState is managed).

Now to the question of how you should implement onTimer. That's up to you
and heavily depends on your use case.
The first onTimer implementation is called 60s after an element of key X
has been processed. If you have additional elements with key X, you get
additional timers. However, in this example, only the latest timer should
actually output data (think of some session detection). That's why the
implementation checks if it is indeed the last timer or not before
outputting elements.

The other implementation always outputs elements independent of additional
timers/elements being added.

On Wed, Jun 16, 2021 at 4:08 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I don't understand why that KeyedProcessFunction.onTimer() is implemented
> here [1] is different from here [2]. Both are KeyedProcessFunction and they
> aim to fire a window on event time. At [1] the events are emitted at if
> (timestamp == result.lastModified + 6) and the time is scheduled from
> the ctx.timestamp().
>
> public void processElement(). {
> current.lastModified = ctx.timestamp();
> ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
> }
> public void onTimer() {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 6) {
> // emit the state on timeout
> out.collect(new Tuple2(result.key,
> result.count));
> }
>
> At [2] there is no comparison of time on the onTimer() method. Plus the
> events are scheduled using a formula (eventTime - (eventTime %
> durationMsec) + durationMsec - 1) and only if they are not late of the
> watermark (eventTime <= timerService.currentWatermark()).
>
> public void processElement(). {
> if (eventTime <= timerService.currentWatermark()) {
> // This event is late; its window has already been triggered.
> } else {
> // Round up eventTime to the end of the window containing this
> event.
> long endOfWindow = (eventTime - (eventTime % durationMsec) +
> durationMsec - 1);
> }
> public void onTimer() {
> Float sumOfTips = this.sumOfTips.get(timestamp);
>
>
> My use case uses a CoProcessFunction and I am saving the states
> on ListState. It works fine with the approach [1]. When I used the approach
> [2] some of the events are late because of the watermark.
>
> What is the correct to be used? Or what is the best?
>
> Afterwards I have to make this function fault tolerant. So, my next
> question is. Do I have to implement CheckpointedFunction and
> CheckpointedRestoring. Or because I am using ListState it already recovers
> from failures?
>
> Thanks,
> Felipe
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>


Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Felipe Gutierrez
Yes, I have state on the ProcessFunction. I tested it on a stand-alone
cluster and it returns true when the application recovers. However, in
integration tests it does not returns true. I am using Flink 1.4. Do you
know where it is saying at Flink release 1.13 (
https://flink.apache.org/news/2021/05/03/release-1.13.0.html) that I cannot
see `isRestored()` equals true on integration tests?

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Thu, Jun 17, 2021 at 4:09 PM Arvid Heise  wrote:

> Does your ProcessFunction has state? If not it would be in line with the
> documentation.
>
> Also which Flink version are you using? Before Flink 1.13 empty state was
> omitted so I could imagine that `isRestored()` would return false but it
> should actually now also return true for empty state.
>
> On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> So, I was trying to improve by using the CheckpointedFunction as it shows
>> here [1]. But the method isRestored() says in its documentation [2]:
>>
>> "Returns true, if state was restored from the snapshot of a previous
>> execution. This returns always false for stateless tasks."
>>
>> It is weird because I am extending a ProcessFunction which is a
>> RichFunction.
>>
>> public class AuctionExceptionSimulatorProcessFunction extends
>> ProcessFunction
>> implements CheckpointedFunction {
>> ...
>>
>> In the end, I cannot rely on the "isRestored()". Do you know what could
>> be wrong? I used the same implementation method of [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>
>>
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>>
>> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan 
>> wrote:
>>
>>> You can also use accumulators [1] to collect the number of restarts
>>> (and then access it via client); but side outputs should work as well.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>>
>>> Regards,
>>> Roman
>>>
>>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>>  wrote:
>>> >
>>> > I just realised that only the ProcessFunction is enough. I don't need
>>> the CheckpointFunction.
>>> >
>>> >
>>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>>> felipe.o.gutier...@gmail.com> wrote:
>>> >>
>>> >> Cool!
>>> >>
>>> >> I did using this example
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>> because I don't have a keyed stream on the specific operator that I want to
>>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>>> >>
>>> >> Because I need to test it in an integration test I am using a side
>>> output (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>>> to attach a sink. I am not sure if you have a better idea to test the
>>> restarts on an integration test. If you have a simple idea please tell me
>>> :). This was the way that I solved
>>> >>
>>> >> Thanks
>>> >> Felipe
>>> >>
>>> >> --
>>> >> -- Felipe Gutierrez
>>> >> -- skype: felipe.o.gutierrez
>>> >>
>>> >>
>>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>>> wrote:
>>> >>>
>>> >>> Hi Felipe,
>>> >>>
>>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>>> >>> that depending on the configuration only a pipeline region can be
>>> >>> restarted, not the whole job).
>>> >>>
>>> >>> But if all you want is to check whether it's a first attempt or not,
>>> >>> you can also call context.isRestored() from initializeState() [2]
>>> >>>
>>> >>> [1]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>>> >>>
>>> >>> [2]
>>> >>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>>> >>>
>>> >>> Regards,
>>> >>> Roman
>>> >>>
>>> >>>
>>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>>> >>>  wrote:
>>> >>> >
>>> >>> > Hello community,
>>> >>> >
>>> >>> > Is it possible to know programmatically how many times my Flink
>>> stream job restarted since it was running?
>>> >>> >
>>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>>> recover the state and after 2 seconds I don't throw any exception anymore.
>>> Then I would like to know how many times the job was restarted.
>>> >>> >
>>> >>> > Thanks,
>>> >>> > Felipe
>>> >>> >
>>>
>>


Re: RocksDB CPU resource usage

2021-06-17 Thread Padarn Wilson
Thanks both for the suggestions, all good ideas. I will try some of the
profiling suggestions and report back.

On Thu, Jun 17, 2021 at 4:13 PM Yun Tang  wrote:

> Hi Padarn,
>
> From my experiences, de-/serialization might not consume 3x CPU usage, and
> the background compaction could also increase the CPU usage. You could use
> async-profiler [1] to figure out what really consumed your CPU usage as it
> could also detect the native RocksDB thread stack.
>
>
> [1] https://github.com/jvm-profiling-tools/async-profiler
>
> Best
> Yun Tang
>
> --
> *From:* Robert Metzger 
> *Sent:* Thursday, June 17, 2021 14:11
> *To:* Padarn Wilson 
> *Cc:* JING ZHANG ; user 
> *Subject:* Re: RocksDB CPU resource usage
>
> If you are able to execute your job locally as well (with enough data),
> you can also run it with a profiler and see the CPU cycles spent on
> serialization (you can also use RocksDB locally)
>
> On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:
>
> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
> Depending on the datatypes you are using, seeing 3x more CPU usage seems
> realistic.
> Serialization can be quite expensive. See also:
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
> Maybe it makes sense to optimize there a bit.
>
> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>
> Hi Padarn,
> After switch stateBackend from filesystem to rocksdb, all reads/writes
> from/to backend have to go through de-/serialization to retrieve/store the
> state objects, this may cause more cpu cost.
> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
> To find out the reason, we need more profile on CPU cost, such as Flame
> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
> in Flink[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> Best,
> JING ZHANG
>
> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>
> Hi all,
>
> We have a job that we just enabled rocksdb on (instead of file backend),
> and see that the CPU usage is almost 3x greater on (we had to increase
> taskmanagers 3x to get it to run.
>
> I don't really understand this, is there something we can look at to
> understand why CPU use is so high? Our state mostly consists of aggregation
> windows.
>
> Cheers,
> Padarn
>
>


Re: How to know (in code) how many times the job restarted?

2021-06-17 Thread Arvid Heise
Does your ProcessFunction has state? If not it would be in line with the
documentation.

Also which Flink version are you using? Before Flink 1.13 empty state was
omitted so I could imagine that `isRestored()` would return false but it
should actually now also return true for empty state.

On Tue, Jun 15, 2021 at 9:02 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> So, I was trying to improve by using the CheckpointedFunction as it shows
> here [1]. But the method isRestored() says in its documentation [2]:
>
> "Returns true, if state was restored from the snapshot of a previous
> execution. This returns always false for stateless tasks."
>
> It is weird because I am extending a ProcessFunction which is a
> RichFunction.
>
> public class AuctionExceptionSimulatorProcessFunction extends
> ProcessFunction
> implements CheckpointedFunction {
> ...
>
> In the end, I cannot rely on the "isRestored()". Do you know what could be
> wrong? I used the same implementation method of [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Mon, Jun 14, 2021 at 3:39 PM Roman Khachatryan 
> wrote:
>
>> You can also use accumulators [1] to collect the number of restarts
>> (and then access it via client); but side outputs should work as well.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html#accumulators--counters
>>
>> Regards,
>> Roman
>>
>> On Sun, Jun 13, 2021 at 10:01 PM Felipe Gutierrez
>>  wrote:
>> >
>> > I just realised that only the ProcessFunction is enough. I don't need
>> the CheckpointFunction.
>> >
>> >
>> > On Fri, 11 Jun 2021, 18:11 Felipe Gutierrez, <
>> felipe.o.gutier...@gmail.com> wrote:
>> >>
>> >> Cool!
>> >>
>> >> I did using this example
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> because I don't have a keyed stream on the specific operator that I want to
>> count the number of restarts. (yes I am using version 1.4 unfortunately).
>> >>
>> >> Because I need to test it in an integration test I am using a side
>> output (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html)
>> to attach a sink. I am not sure if you have a better idea to test the
>> restarts on an integration test. If you have a simple idea please tell me
>> :). This was the way that I solved
>> >>
>> >> Thanks
>> >> Felipe
>> >>
>> >> --
>> >> -- Felipe Gutierrez
>> >> -- skype: felipe.o.gutierrez
>> >>
>> >>
>> >> On Thu, Jun 10, 2021 at 5:41 PM Roman Khachatryan 
>> wrote:
>> >>>
>> >>> Hi Felipe,
>> >>>
>> >>> You can use getRuntimeContext().getAttemptNumber() [1] (but beware
>> >>> that depending on the configuration only a pipeline region can be
>> >>> restarted, not the whole job).
>> >>>
>> >>> But if all you want is to check whether it's a first attempt or not,
>> >>> you can also call context.isRestored() from initializeState() [2]
>> >>>
>> >>> [1]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getAttemptNumber--
>> >>>
>> >>> [2]
>> >>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/state/ManagedInitializationContext.html#isRestored--
>> >>>
>> >>> Regards,
>> >>> Roman
>> >>>
>> >>>
>> >>> On Thu, Jun 10, 2021 at 5:30 PM Felipe Gutierrez
>> >>>  wrote:
>> >>> >
>> >>> > Hello community,
>> >>> >
>> >>> > Is it possible to know programmatically how many times my Flink
>> stream job restarted since it was running?
>> >>> >
>> >>> > My use case is like this. I have an Unit test that uses checkpoint
>> and I throw one exception in a MapFunction for a given time, i.e.: for the
>> 2 seconds ahead. Because Flink restarts the job and I have checkpoint I can
>> recover the state and after 2 seconds I don't throw any exception anymore.
>> Then I would like to know how many times the job was restarted.
>> >>> >
>> >>> > Thanks,
>> >>> > Felipe
>> >>> >
>>
>


Re: Error with extracted type from custom partitioner key

2021-06-17 Thread Arvid Heise
This could be a bug but I'd need to see more of the DataStream code to be
sure. Could you share that code?

On Sat, Jun 12, 2021 at 9:56 PM Ken Krugler 
wrote:

> Hi Timo,
>
> Thanks, I’ll give the ResultTypeQueryable interface a try - my previous
> experience registering custom Kryo serializers wasn’t so positive.
>
> Though I’m still curious as to whether java.lang.ClassCastException I got
> was representative of a bug in Flink, or my doing something wrong.
>
> But with the ongoing deprecation of DataSet support, I imagine that’s a
> low priority issue in any case.
>
> Regards,
>
> — Ken
>
>
> On Jun 4, 2021, at 7:05 AM, Timo Walther  wrote:
>
> Hi Ken,
>
> non-POJOs are serialized with Kryo. This might not give you optimal
> performance. You can register a custom Kryo serializer in ExecutionConfig
> to speed up the serialization.
>
> Alternatively, you can implement `ResultTypeQueryable` provide a custom
> type information with a custom serializer.
>
> I hope this helps. Otherwise can you share a little example how you would
> like to cann partitionCustom()?
>
> Regards,
> Timo
>
> On 04.06.21 15:38, Ken Krugler wrote:
>
> Hi all,
> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch
> mode, with a DataSet) to do a better job of distributing data to tasks. The
> classes look like:
> public class MyPartitioner implements Partitioner
> {
> ...
> }
> public class MyGroupingKey implements Comparable
> {
> ...
> }
> This worked fine, but I noticed a warning logged by Flink
> about MyGroupingKey not having an empty constructor, and thus not being
> treated as a POJO.
> I added that empty constructor, and then I got an error
> because partitionCustom() only works on a single field key.
> So I changed MyGroupingKey to have a single field (a string), with
> transient cached values for the pieces of the key that I need while
> partitioning. Now I get an odd error:
> java.lang.RuntimeException: Error while calling custom partitioner
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to MyGroupingKey
> at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
> at
> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
> ... 19 more
> So I've got two questions…
> • Should I just get rid of the empty constructor, and have Flink treat it
> as a non-POJO? This seemed to be working fine.
> • Is it a bug in Flink that the extracted field from the key is being used
> as the expected type for partitioning?
> Thanks!
> — Ken
> --
> Ken Krugler
> http://www.scaleunlimited.com 
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Re: Output from RichAsyncFunction on failure

2021-06-17 Thread Arvid Heise
Hi Satish,

usually you would side-outputs [1] for that but afaik asyncIO doesn't
support that (yet).
So your option works well to use some union type. You can then chain a map
function that uses side-outputs.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/side_output/

On Fri, Jun 11, 2021 at 7:49 PM Satish Saley  wrote:

> One way I thought to achieve this is -
> 1. For failures, add a special record to collection in RichAsyncFunction
> 2. Filter out those special records from the DataStream and push to
> another Kafka
> Let me know if it makes sense.
>
>
> On Fri, Jun 11, 2021 at 10:40 AM Satish Saley 
> wrote:
>
>> Hi,
>> - I have a kafka consumer to read events.
>> - Then, I have RichAsyncFunction to call a remote service to get
>> more information about that event.
>>
>> If the remote call fails after X number of retries, I don't want flink to
>> fail the job and start processing from the beginning. Instead I would like
>> to push info about failed call to another Kafka topic. Is there a way to
>> achieve this?
>>
>


Re: Kafka Connector Topic Discovery

2021-06-17 Thread Arvid Heise
Hi Nick,

This looks like a valid use case and shouldn't fail. The only workaround I
see is to create some dummy topics then.

On Fri, Jun 11, 2021 at 12:11 AM Martin, Nick J [US] (SP) <
nick.mar...@ngc.com> wrote:

> I’m trying to use the topic discovery feature of the Kafka Connector. The
> problem I’m having is that Kafka Consumers fail to start if there are no
> topics matching the topic regex when they start up. Is this intended
> behavior? Is there some other property I could set to just continue
> discovery until they find a matching topic?
>
>
>
> Background:
>
> My application uses dynamically generated topic names where specific
> messages are sent on different topics based on some metadata in the
> messages. A producer service reads the metadata, determines the topic the
> data should be sent to, applies some validation logic to the topic name,
> creates that topic if it doesn’t already exist, and then sends the data.
> The problem is, when starting the whole stack with a fresh Kafka cluster,
> my Flink job with the Kafka consumer can’t start until the producer service
> has been started and has processed some data so that at least one matching
> topic exists. This sort of startup order dependency is obviously
> undesirable in a distributed microservice architecture.
>
>
>
> Are there existing features/configuration settings that solve this
> problem? Should I open a ticket?
>


Re: Flink and Avro for state serialization

2021-06-17 Thread Arvid Heise
Hi Yashwant,

I don't know Beam well, so you might also want to ask on their user list.
But I'll try to answer it from Flink's perspective.

If you want to work with Avro, you should use an AvroSerializer which
supports schema evolution in the best possible way.
PojoSerializer also allows small modifications but the error looks more
like you actually used Serializable.

If you need further help, please provide the full stacktrace.

On Thu, Jun 10, 2021 at 10:49 PM Yashwant Ganti 
wrote:

> Hello all,
>
> We are running some Flink jobs - we wrote the job in Beam but are using
> the Flink Runner and are seeing the following error when restarting the job
> from a Savepoint
>
> Caused by: java.io.InvalidClassException: com.xxx.xxx; local class
>> incompatible: stream classdesc serialVersionUID = -5544933308624767500,
>> local class serialVersionUID = -7822035950742261370
>
>
> Here is what happened
>
>- The Type in question is an Avro Type - so we have a
>*`PCollection` in the job.
>- We updated the Avro schema and by default the generated class will
>have a new serialVersionUID in Avro (the serialVersionUIDs above line up
>with the ones in the generated Avro classes)
>- We did not use any custom serializers for this type so I believe it
>would have been covered by Flink's POJO serializer (through Beam) and that
>is breaking because of the serialVersionUID change
>
>
> I am wondering how to work around this without losing my savepoint. We are
> going to try the following way and was wondering if the community had any
> suggestions
>
>- Add flink-avro into the job jar as mentioned in
>
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro.
>I am not sure this would work because the original bytes were written out
>by the POJO serializer and that is probably going to be used for
>deserialization? There must be some record of which serializer wrote out
>the bytes and I am not sure how to override that
>- I also wanted to make sure for future use cases that including the
>avro jar on the classpath will only affect Avro types by default
>
> Thanks,
> Yash
>
>
>


回复:flinksql消费kafka写入doris中文乱码

2021-06-17 Thread JasonLee
hi


你可以先用 print 的 connector 把消费到的数据打印一下看是否乱码? 还是写入到 doris 后出现的乱码?


Best
JasonLee
在2021年6月17日 21:31,maker_d...@foxmail.com 写道:
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'kafka',
'topic' = 'datacollect_business_stage',
'properties.bootstrap.servers' = 'XXX',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4
部署模式:on yarn

希望各位大佬帮助。
谢谢!


maker_d...@foxmail.com


flinksql消费kafka写入doris中文乱码

2021-06-17 Thread maker_d...@foxmail.com
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
...
 CHARACTER SET `UTF-8`
) WITH (
  'connector' = 'kafka',
  'topic' = 'datacollect_business_stage',
  'properties.bootstrap.servers' = 'XXX',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
  ...
   CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4 
部署模式:on yarn

希望各位大佬帮助。
谢谢! 


maker_d...@foxmail.com


Re: Re: Re: Re: Failed to cancel a job using the STOP rest API

2021-06-17 Thread Piotr Nowojski
Hi Thomas. The bug https://issues.apache.org/jira/browse/FLINK-21028 is
still present in 1.12.1. You would need to upgrade to at least 1.13.0,
1.12.2 or 1.11.4. However as I mentioned before, 1.11.4 hasn't yet been
released. On the other hand both 1.12.2 and 1.13.0 have already been
superseded by more recent minor releases (1.13.1 and 1.12.4 respectively).

Piotre

śr., 16 cze 2021 o 06:01 Thomas Wang  napisał(a):

> Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which
> uses Flink 1.12.1. I will report back whether this resolves the issue.
>
> Thomas
>
> On Wed, Jun 9, 2021 at 11:15 PM Yun Gao  wrote:
>
>> Very thanks Kezhu for the catch, it also looks to me the same issue as
>> FLINK-21028.
>>
>> --
>> From:Piotr Nowojski 
>> Send Time:2021 Jun. 9 (Wed.) 22:12
>> To:Kezhu Wang 
>> Cc:Thomas Wang ; Yun Gao ; user <
>> user@flink.apache.org>
>> Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
>>
>> Yes good catch Kezhu, IllegalStateException sounds very much like
>> FLINK-21028.
>>
>> Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't
>> been released yet)?
>>
>> Piotrek
>>
>> wt., 8 cze 2021 o 17:18 Kezhu Wang  napisał(a):
>> Could it be same as FLINK-21028[1] (titled as “Streaming application
>> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
>>
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-21028
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:
>> Hi Thomas,
>>
>> I tried but do not re-produce the exception yet. I have filed
>> an issue for the exception first [1].
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22928
>>
>>
>> --Original Mail --
>> *Sender:*Thomas Wang 
>> *Send Date:*Tue Jun 8 07:45:52 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API
>> This is actually a very simple job that reads from Kafka and writes to S3
>> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
>> and nothing custom.
>>
>> Thomas
>>
>> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao  wrote:
>> Hi Thoms,
>>
>> Very thanks for reporting the exceptions, and it seems to be not work as
>> expected to me...
>> Could you also show us the dag of the job ? And does some operators in
>> the source task
>> use multiple-threads to emit records?
>>
>> Best,
>> Yun
>>
>>
>> --Original Mail --
>> *Sender:*Thomas Wang 
>> *Send Date:*Sun Jun 6 04:02:20 2021
>> *Recipients:*Yun Gao 
>> *CC:*user 
>> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>> One thing I noticed is that if I set drain = true, the job could be
>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>> is a bulk-encoded format and only writes to disk during checkpoints?
>>
>> Thomas
>>
>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang  wrote:
>> Hi Yun,
>>
>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
>> not quite sure what they mean though. Any hints?
>>
>> Thanks.
>>
>> Thomas
>>
>> ```
>> 2021-06-05 10:02:51
>> java.util.concurrent.ExecutionException:
>> org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
>> .java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
>> 1928)
>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>> .close(StreamOperatorWrapper.java:130)
>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>> .close(StreamOperatorWrapper.java:134)
>> at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>> .close(StreamOperatorWrapper.java:80)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .closeOperators(OperatorChain.java:302)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
>> StreamTask.java:576)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:544)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>> at org.apache.flink.streaming.api.operators.CountingOutput
>> .emitWatermark(CountingOutput.java:41)
>> at org.apache.flink.streaming.runtime.operators.
>> 

Re:Re: Flink sql case when problem

2021-06-17 Thread 纳兰清风
Hi Leonard Xu,

The version is 1.13. Is it a bug? I noticed that the type of column `b` is 
integer, but I use it as varchar.
What the expected action should it be ?




At 2021-06-17 20:11:24, "Leonard Xu"  wrote:

Hi, houying


It looks like a bug when code generate the operator code, which Flink version 
are you using? 
Could you help create an JIRA ticket?




Best,
Leonard




在 2021年6月17日,19:48,纳兰清风  写道:


Hello,


When I am using case when statement in flink sql, I got an error as follow:


org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$3,isNull$3,,INT,None) and 
ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData)
 str$4),false,,CHAR(0) NOT NULL,Some()), 
GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) 
str$5),false,,CHAR(1) NOT NULL,Some(0))).
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
..


My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case 
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', 
'0')


Does anyone have idea about this ?





 




Re: Re: Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-17 Thread yidan zhao
我仔细想了想,我的集群是内网服务器上的容器,容器之间访问应该不算经过NAT。

当然和网络相关的监控来看,的确很多机器的time-wait状态的连接不少,在5w+个左右,但也不至于导致这个问题感觉。

东东  于2021年6月17日周四 下午2:48写道:
>
> 这俩都开启的话,就要求同一源ip的连接请求中的timstamp必须是递增的,否则(非递增)的连接请求被视为无效,数据包会被抛弃,给client端的感觉就是时不时的连接超时。
>
>
>
> 一般来说单机不会有这个问题,因为时钟应该是一个,在NAT后面才容易出现这个现象(因为多个主机时钟通常不完全一致),但不清楚你的具体架构,只能说试一试。
>
>
> 最后,可以跟运维讨论一下,除非确信不会有经过NAT过来的链接,否则这俩最好别都开。
>
>
> PS: kernel 4.1里面已经把 tcp_tw_reuse 这玩意废掉了,因为太多人掉这坑里了
>
>
> 在 2021-06-17 14:07:50,"yidan zhao"  写道:
> >这啥原理,这个改动我没办法直接改,需要申请。
> >
> >东东  于2021年6月17日周四 下午1:36写道:
> >>
> >>
> >>
> >> 把其中一个改成0
> >>
> >>
> >> 在 2021-06-17 13:11:01,"yidan zhao"  写道:
> >> >是的,宿主机IP。
> >> >
> >> >net.ipv4.tcp_tw_reuse = 1
> >> >net.ipv4.tcp_timestamps = 1
> >> >
> >> >东东  于2021年6月17日周四 下午12:52写道:
> >> >>
> >> >> 10.35.215.18是宿主机IP?
> >> >>
> >> >> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
> >> >> 实在不行就 tcpdump 吧
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
> >> >> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
> >> >> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
> >> >> >
> >> >> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
> >> >> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
> >> >> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
> >> >> >
> >> >> >东东  于2021年6月17日周四 上午11:19写道:
> >> >> >>
> >> >> >> 单机standalone,还是Docker/K8s ?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >> >> >> >Hi, yingjie.
> >> >> >> >If the network is not stable, which config parameter I should 
> >> >> >> >adjust.
> >> >> >> >
> >> >> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >> >> >> >>
> >> >> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> >> >> >> 142892, so it is not bad.
> >> >> >> >> 3: stream job.
> >> >> >> >> 4: I will try to config taskmanager.network.retries which is 
> >> >> >> >> default
> >> >> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's 
> >> >> >> >> default
> >> >> >> >> is 120s。
> >> >> >> >> 5: I checked the net fd number of the taskmanager, it is about 
> >> >> >> >> 1000+,
> >> >> >> >> so I think it is a reasonable value.
> >> >> >> >>
> >> >> >> >> 1: can not be sure.
> >> >> >> >>
> >> >> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >> >> >> >
> >> >> >> >> > Hi yidan,
> >> >> >> >> >
> >> >> >> >> > 1. Is the network stable?
> >> >> >> >> > 2. Is there any GC problem?
> >> >> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] 
> >> >> >> >> > for more information.
> >> >> >> >> > 4. You may try to config these two options: 
> >> >> >> >> > taskmanager.network.retries, 
> >> >> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More 
> >> >> >> >> > relevant options can be found in 'Data Transport Network Stack' 
> >> >> >> >> > section of [2].
> >> >> >> >> > 5. If it is not the above cases, it is may related to [3], you 
> >> >> >> >> > may need to check the number of tcp connection per TM and node.
> >> >> >> >> >
> >> >> >> >> > Hope this helps.
> >> >> >> >> >
> >> >> >> >> > [1] 
> >> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> >> >> >> > [2] 
> >> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> >> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >> >> >> >
> >> >> >> >> > Best,
> >> >> >> >> > Yingjie
> >> >> >> >> >
> >> >> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >> >> >> >>
> >> >> >> >> >> Attachment is the exception stack from flink's web-ui. Does 
> >> >> >> >> >> anyone
> >> >> >> >> >> have also met this problem?
> >> >> >> >> >>
> >> >> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 
> >> >> >> >> >> containers,
> >> >> >> >> >> each 28G mem.


Re: Flink sql case when problem

2021-06-17 Thread Leonard Xu
Hi, houying

It looks like a bug when code generate the operator code, which Flink version 
are you using? 
Could you help create an JIRA ticket?


Best,
Leonard


> 在 2021年6月17日,19:48,纳兰清风  写道:
> 
> Hello,
> 
> When I am using case when statement in flink sql, I got an error as 
> follow:
> 
> org.apache.flink.table.planner.codegen.CodeGenException: Unable to find 
> common type of GeneratedExpression(field$3,isNull$3,,INT,None) and 
> ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData)
>  str$4),false,,CHAR(0) NOT NULL,Some()), 
> GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) 
> str$5),false,,CHAR(1) NOT NULL,Some(0))).
>   at 
> org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
>   at scala.Option.orElse(Option.scala:289)
>   at 
> org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>   at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>   at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   ..
> 
> My SQL is 
> create table source_tb (
> a varchar,
> b INTEGER
> ) with (
> 'connector' = 'kafka',
> ...
> );
> 
> create table sink_tb (
> a varchar,
> c varchar
> ) with (
> 'connector' = 'console',
> 'format' = 'rich-json'
> );
> 
> insert into sink_tb(a, c)
> select a,
> case 
> when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
> else from_unixtime(cast(b as bigint))
> end as c
> from source_tb;
> But it works well when I change the when statement to  b is null or b in ('', 
> '0')
> 
> Does anyone have idea about this ?
> 
> 
> 
>  



Flink sql case when problem

2021-06-17 Thread 纳兰清风
Hello,


When I am using case when statement in flink sql, I got an error as follow:


org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$3,isNull$3,,INT,None) and 
ArrayBuffer(GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData)
 str$4),false,,CHAR(0) NOT NULL,Some()), 
GeneratedExpression(((org.apache.flink.table.data.binary.BinaryStringData) 
str$5),false,,CHAR(1) NOT NULL,Some(0))).
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:354)
at scala.Option.orElse(Option.scala:289)
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:354)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$4(CalcCodeGenerator.scala:141)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
..


My SQL is 
create table source_tb (
a varchar,
b INTEGER
) with (
'connector' = 'kafka',
...
);

create table sink_tb (
a varchar,
c varchar
) with (
'connector' = 'console',
'format' = 'rich-json'
);

insert into sink_tb(a, c)
select a,
case 
when b is null or b = '' or b = '0' then '1000-01-01 00:00:00'
else from_unixtime(cast(b as bigint))
end as c
from source_tb;
But it works well when I change the when statement to  b is null or b in ('', 
'0')


Does anyone have idea about this ?



How to make onTimer() trigger on a CoProcessFunction after a failure?

2021-06-17 Thread Felipe Gutierrez
Hi community,

I have implemented a join function using CoProcessFunction with
CheckpointedFunction to recover from failures. I added some debug lines to
check if it is restoring and it does. Before the crash, I process events
that fall at processElement2. I create snapshots at snapshotState(), the
application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I
register event timers for them as I did on processElement2 before the
crash. But the onTimer() is never called. The point is that I don't have
any events to send to processElement2() to make the CoProcessFunction
register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are
"timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
processElement2. Because when I test the same application without crashing
and the CoProcessFunction triggers the onTimer() method.

But if I have a crash in the middle the CoProcessFunction does not call
onTimer(). Why is that? Is that normal? What do I have to do to make the
CoProcessFunction trigger the onTime() method even if only one stream is
processed let's say at the processElement2() method and the other stream is
restored from a snapshot? I imagine that I have to register a time during
the recovery (initializeState()). But how?

thanks,
Felipe


Re: Flink parameter configuration does not take effect

2021-06-17 Thread Jason Lee
Hi Robert,


Thank you for your enthusiastic answer


I have understood the current problem and look forward to a good solution and 
optimization by the community. I will continue to pay attention to changes in 
the community.


Best,
Jason


| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/17/2021 15:45,Robert Metzger wrote:
Hi Jason,


I hope you don't mind that I brought back the conversation to the user@ mailing 
list, so that others can benefit from the information as well.


Thanks a lot for sharing your use case. I personally believe that Flink should 
support invocations like "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob 
flink-stream-sql-ddl-1.0.0.jar ./config.json". There is no fundamental reason 
why this can not be supported.


The Javadoc about tableEnv.getConfig() mentions that the config is only about 
the "runtime behavior":
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L1151

... but I see how this is not clearly defined.


As a short-term fix, I've proposed to clarify in the configuration table which 
options are cluster vs job configurations: 
https://issues.apache.org/jira/browse/FLINK-22257.


But in the long term, we certainly need to improve the user experience.




On Wed, Jun 16, 2021 at 3:31 PM Jason Lee  wrote:

Dear Robert,


For tasks running on the cluster, some parameter configurations are global, but 
some parameter configurations need to be customized, such as some memory 
settings of TaskManager. For tasks with different state sizes, we need to 
configure different parameters. These parameters should not  be configured in 
flink-config.yaml. But for the current Flink, these parameters cannot be 
configured through StreamExecutionEnvironment, and some parameters are not 
effective if set through StreamTableEnvironment.


At the same time, Configuration is immutable after the task is started, which 
is correct, but I think some global parameters should also be specified in 
StreamExecutionEnvironment. At present, some parameters of checkpoint are also 
set globally, but they can be set through "StreamExecutionEnvironment 
.getCheckpointConfig().set()", then why can't the parameters of TaskManager's 
memory be set in this way? I think that setting the global parameters by "flink 
run -yD" is the same as setting by "StreamExecutionEnvironment". I am not sure 
if I understand it correctly.


I agree with you. I think we need to specify in the configuration of the 
official document that those parameters are best configured in 
flink-config.yaml. Those parameters can be modified in 
"StreamExecutionEnvironment", and those can only be passed through others 
Modified in the way. I think the document will be clearer for users.


Best,
Jason
| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/16/2021 21:04,Jason Lee wrote:
Dear Robert,


Thanks for your answer


Our Flink SQL task is deployed through Per job.


We provide our users with a platform for developing Flink SQL tasks. We will 
write the user's SQL code and configuration parameters into a Config.json file. 
At the same time, we develop a Flink Jar task at the bottom to actually execute 
the user's SQL through the command line. To perform this task, for example, the 
following is our instruction to start a Flink SQL task: "flink run -m 
yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar 
./config.json". In order to facilitate the user's personalized configuration 
parameters, we want to set user configuration parameters in the execution 
environment of the FlinkStreamSQLDDLJob class that we have implemented, such as 
the "taskmanager.memory.managed.fraction" parameter, but it is currently 
impossible to configure through the Flink execution environment These 
parameters, because they are not effective, can only be configured by flink run 
-yD.


I think the configuration in the official document states that those parameters 
cannot be set through 
"StreamTableEnvironment.getConfig.getConfiguration().set()", but can only be 
set through flink run -yD or configured in flink-conf.yaml. If the current 
document does not explain it, it will not take effect if you use the 
"StreamTableEnvironment.getConfig.getConfiguration().set()" method to set some 
parameters. In order to increase the use of personalized configuration 
parameters for users, I think these instructions can appear in the 
Configuration of the official document.


Best,
Jason


| |
JasonLee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


On 06/16/2021 18:37,Robert Metzger wrote:
Hi Jason,


How are you deploying your Flink SQL tasks? (are you using per-job/application 
clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By default, 
I would recommend assuming that all configuration parameters are cluster 
settings, which require a cluster restart. Very few options (mostly 

Flink 提交到yarn失败

2021-06-17 Thread yangpengyi
FLINK版本: FLINK-1.12
HADOOP环境: CDH6.1.1 
利用yarn-per-job模式提交失败,看堆栈应该是在初始化hdfs连接的时候出错,不过看起来应该是使用了正确的hdfs-client包,不知道为什么还是会有returnType的问题?

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
the classpath, or some classes are missing from the classpath.
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:117)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:309)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:272)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:212)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:173)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
~[cloud-flinkAppCrashAnalysis-1.0.0-encodetest-RELEASE.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:172)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
... 2 more
Caused by: java.lang.VerifyError: Bad return type
Exception Details:
  Location:
   
org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
@157: areturn
  Reason:
Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is
not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature)
  Current Frame:
bci: @157
flags: { }
locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
stack: { 'org/apache/hadoop/fs/ContentSummary' }
  Bytecode:
0x000: 2ab6 00b5 2a13 01f4 2bb6 00b7 4d01 4e2a
0x010: b400 422b b901 f502 003a 042c c600 1d2d
0x020: c600 152c b600 b9a7 0012 3a05 2d19 05b6
0x030: 00bb a700 072c b600 b919 04b0 3a04 1904
0x040: 4e19 04bf 3a06 2cc6 001d 2dc6 0015 2cb6
0x050: 00b9 a700 123a 072d 1907 b600 bba7 0007
0x060: 2cb6 00b9 1906 bf4d 2c07 bd00 d459 0312
0x070: d653 5904 12e0 5359 0512 e153 5906 1301
0x080: f653 b600 d74e 2dc1 01f6 9900 14b2 0023
0x090: 1301 f7b9 002b 0200 2a2b b601 f8b0 2dbf
0x0a0:
  Exception Handler Table:
bci [35, 39] => handler: 42
bci [15, 27] => handler: 60
bci [15, 27] => handler: 68
bci [78, 82] => handler: 85
bci [60, 70] => handler: 68
bci [4, 57] => handler: 103
bci [60, 103] => handler: 103
  Stackmap Table:
   
full_frame(@42,{Object[#751],Object[#774],Object[#829],Object[#799],Object[#1221]},{Object[#799]})
same_frame(@53)
same_frame(@57)
   
full_frame(@60,{Object[#751],Object[#774],Object[#829],Object[#799]},{Object[#799]})
same_locals_1_stack_item_frame(@68,Object[#799])
   
full_frame(@85,{Object[#751],Object[#774],Object[#829],Object[#799],Top,Top,Object[#799]},{Object[#799]})
same_frame(@96)
same_frame(@100)
full_frame(@103,{Object[#751],Object[#774]},{Object[#854]})
append_frame(@158,Object[#854],Object[#814])

at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:167)
~[hadoop-hdfs-client-3.0.0-cdh6.1.1.jar:?]
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:164)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

回复:退訂

2021-06-17 Thread liuhu1993
退订
--
发件人:Jingsong Li 
发送时间:2021年6月17日(星期四) 15:37
收件人:user-zh 
主 题:Re: 退訂

退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau 
wrote:

> 郵箱更換,退訂
>
> Regards,
> Hau ChongAih
>


-- 
Best, Jingsong Lee



flink kafka producer avro serializer problem

2021-06-17 Thread Xin Ma
Hello,

Currently, I am using confluent Kafka Avro serializer to write to Kafka,
and in the meanwhile register the schema to confluent schema registry.

The problem here is that our upstream is deserialized from msgpack and
converted to a hashmap, which is not serializable
for avro. The map includes the properties of an event.  The key is the
field name and the value is the field value. As we have many different
msgpack upstreams, and each represents a type of event, we don't want to do
the serialization as the official sample codes did. It has to provide an
Avro schema file for each upstream, which is hard for us to manage.

So I got two ideas here,
first, since we can use java reflection to build java POJO from a
json/hashmap, is it also possible to build an avro GenericRecord like that?

second, generate the avro schema by iterating each incoming hashmap's key
and value. I found an implementation here
https://stackoverflow.com/questions/60269983/is-there-any-way-to-generate-an-avro-schema-from-a-hashmap-of-values

Could anyone provide any recommendations on how to implement this?

package com.wish.log.etl.fn;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.msgpack.value.ImmutableValue;

import java.util.HashMap;
import java.util.Map;

public class KafkaGenericAvroSerializationSchema implements
SerializationSchema> {

private final String registryUrl = "";
private transient KafkaAvroSerializer inner;
private final String topic;

public KafkaGenericAvroSerializationSchema (String topic){
this.topic = topic;
}

private void checkInitialized() {
if (inner == null) {
Map props = new HashMap<>();
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroSerializer(client, props);
}
}

@Override
public byte[] serialize(Map input) {
// KafkaAvroSerializer is not serializable, needs to be initialized here
checkInitialized();

// official sample codes
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +

"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericData.Record record = new
GenericRecordBuilder(schema).set("f1","kevin")
.set("f2",1234)
.build();

// how to serialize my input

return inner.serialize(topic, input);
}

}

Best,
Kevin


PyFlink LIST type problem

2021-06-17 Thread László Ciople
Hello,
While trying to use the Pyflink DataStream API in Flink 1.13, I have
encountered an error regarding list types. I am trying to read data from a
Kafka topic that contains events in a json format. For example:
{
"timestamp": 1614259940,
"harvesterID": "aws-harvester",
"clientID": "aws-client-id",
"deviceID": "aws-devid",
"payload": {
"Version": {
"PolicyVersion": {
"Document": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "ec2:*",
"Effect": "Allow",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "elasticloadbalancing:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "cloudwatch:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "autoscaling:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:CreateServiceLinkedRole",
"Resource": "*",
"Condition": {
"StringEquals": {
"iam:AWSServiceName": [
"autoscaling.amazonaws.com",
"ec2scheduled.amazonaws.com",
"elasticloadbalancing.amazonaws.com"
,
"spot.amazonaws.com",
"spotfleet.amazonaws.com",
"transitgateway.amazonaws.com"
]
}
}
}
]
},
"VersionId": "v5",
"IsDefaultVersion": true,
"CreateDate": "2018-11-27 02:16:56+00:00"
},
"ResponseMetadata": {
"RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid":
"6d32c946-1273-4bc5-b465-e5549dc4f515",
"content-type": "text/xml",
"content-length": "2312",
"vary": "accept-encoding",
"date": "Thu, 25 Feb 2021 15:32:18 GMT"
},
"RetryAttempts": 0
}
},
"Policy": {
"Policy": {
"PolicyName": "AmazonEC2FullAccess",
"PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6",
"Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess",
"Path": "/",
"DefaultVersionId": "v5",
"AttachmentCount": 2,
"PermissionsBoundaryUsageCount": 0,
"IsAttachable": true,
"Description":
"Provides full access to Amazon EC2 via the AWS Management Console.",
"CreateDate": "2015-02-06 18:40:15+00:00",
"UpdateDate": "2018-11-27 02:16:56+00:00"
},
"ResponseMetadata": {
"RequestId": "a7e9f175-a757-4215-851e-f3d001083631",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid":
"a7e9f175-a757-4215-851e-f3d001083631",
"content-type": "text/xml",
"content-length": "866",
"date": "Thu, 25 Feb 2021 15:32:18 GMT"
},
"RetryAttempts": 0
}
}
}
}

I have tried to map this json to Flink data types as follows:
input_type = Types.ROW_NAMED(
['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'],
[
Types.LONG(),  # timestamp
Types.STRING(),  # harvesterID
Types.STRING(),  # clientID
Types.STRING(),  # deviceID
Types.ROW_NAMED(  # Payload
['Version', 'Policy'],
[
Types.ROW_NAMED(  # Version
['PolicyVersion', 'ResponseMetadata'],
[
Types.ROW_NAMED(  # PolicyVersion
['Document', 'VersionId', 'IsDefaultVersion'
, 'CreateDate'],
[
Types.ROW_NAMED(  # Document
['Version', 'Statement'],
 

question about flink on k8s per-job mode

2021-06-17 Thread at003
哈喽,各位专家/大神:


为啥flink官方文档说明了flink on k8s 还有 native k8s都不支持 per-job mode 呢,但是搜索可以搜到好多教程。。。


谢谢

Re: Flink state evolution with avro

2021-06-17 Thread Yun Tang
Hi,

你可以参照社区的 state-evolution的 E2E 测试代码 [1], 整个程序就是使用的avro作为相关类的声明工具。


[1] 
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-state-evolution-test/src/main

祝好
唐云

From: casel.chen 
Sent: Friday, June 11, 2021 8:13
To: user-zh@flink.apache.org 
Subject: Flink state evolution with avro

Is there any live code example about flink state evolution with avro? Thanks!


Re: RocksDB CPU resource usage

2021-06-17 Thread Yun Tang
Hi Padarn,

>From my experiences, de-/serialization might not consume 3x CPU usage, and the 
>background compaction could also increase the CPU usage. You could use 
>async-profiler [1] to figure out what really consumed your CPU usage as it 
>could also detect the native RocksDB thread stack.


[1] https://github.com/jvm-profiling-tools/async-profiler

Best
Yun Tang


From: Robert Metzger 
Sent: Thursday, June 17, 2021 14:11
To: Padarn Wilson 
Cc: JING ZHANG ; user 
Subject: Re: RocksDB CPU resource usage

If you are able to execute your job locally as well (with enough data), you can 
also run it with a profiler and see the CPU cycles spent on serialization (you 
can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson 
mailto:pad...@gmail.com>> wrote:
Thanks Robert. I think it would be easy enough to test this hypothesis by 
making the same comparison with some simpler state inside the aggregation 
window.

On Wed, 16 Jun 2021, 7:58 pm Robert Metzger, 
mailto:rmetz...@apache.org>> wrote:
Depending on the datatypes you are using, seeing 3x more CPU usage seems 
realistic.
Serialization can be quite expensive. See also: 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html 
Maybe it makes sense to optimize there a bit.

On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG 
mailto:beyond1...@gmail.com>> wrote:
Hi Padarn,
After switch stateBackend from filesystem to rocksdb, all reads/writes from/to 
backend have to go through de-/serialization to retrieve/store the state 
objects, this may cause more cpu cost.
But I'm not sure it is the main reason leads to 3x CPU cost in your job.
To find out the reason, we need more profile on CPU cost, such as Flame Graphs. 
BTW, starting with Flink 1.13, Flame Graphs are natively supported in Flink[1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

Best,
JING ZHANG

Padarn Wilson mailto:pad...@gmail.com>> 于2021年6月15日周二 
下午5:05写道:
Hi all,

We have a job that we just enabled rocksdb on (instead of file backend), and 
see that the CPU usage is almost 3x greater on (we had to increase taskmanagers 
3x to get it to run.

I don't really understand this, is there something we can look at to understand 
why CPU use is so high? Our state mostly consists of aggregation windows.

Cheers,
Padarn


Re: Please advise bootstrapping large state

2021-06-17 Thread Timo Walther

Hi Marco,

which operations do you want to execute in the bootstrap pipeline?

Maybe you don't need to use SQL and old planner. At least this would 
simplify the friction by going through another API layer.


The JDBC connector can be directly be used in DataSet API as well.

Regards,
Timo



On 17.06.21 07:33, Marco Villalobos wrote:

Thank you very much!

I tried using Flink's SQL JDBC connector, and ran into issues.  
According to the flink documentation, only the old planner is compatible 
with the DataSet API.


When I connect to the table:

CREATE TABLE my_table (

) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = '?',
    'connector.username' = '?',
    'connector.password' = '?',
    'connector.table' = 'my_table'
)

It creates a JdbcTableSource, but only BatchTableSource and 
InputFormatTableSource are supported in BatchTableEnvironment.


By the way, it was very challenging to figure out how to create that 
connection string, because its a different format than what is in the 
documentation. I had to comb through JdbcTableSourceSinkFactory to 
figure out how to connect.


Is it even possible to use the DataSet API with the Table SQL api in 
Flink 1.12.1?



On Wed, Jun 16, 2021 at 4:55 AM Robert Metzger > wrote:


Hi Marco,

The DataSet API will not run out of memory, as it spills to disk if
the data doesn't fit anymore.
Load is distributed by partitioning data.

Giving you advice depends a bit on the use-case. I would explore two
major options:
a) reading the data from postgres using Flink's SQL JDBC connector
[1]. 200 GB is not much data. A 1gb network link needs ~30 minutes
to transfer that (125 megabytes / second)
b) Using the DataSet API and state processor API. I would first try
to see how much effort it is to read the data using the DataSet API
(could be less convenient than the Flink SQL JDBC connector).

[1]

https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/




On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos
mailto:mvillalo...@kineteque.com>> wrote:

I must bootstrap state from postgres (approximately 200 GB of
data) and I notice that the state processor API requires the
DataSet API in order to bootstrap state for the Stream API.

I wish there was a way to use the SQL API and use a partitioned
scan, but I don't know if that is even possible with the DataSet
API.

I never used the DataSet API, and I am unsure how it manages
memory, or distributes load, when handling large state.

Would it run out of memory if I map data from a JDBCInputFormat
into a large DataSet and then use that to bootstrap state for my
stream job?

Any advice on how I should proceed with this would be greatly
appreciated.

Thank you.





来自chenxuying的邮件

2021-06-17 Thread chenxuying



Re: Flink parameter configuration does not take effect

2021-06-17 Thread Robert Metzger
Hi Jason,

I hope you don't mind that I brought back the conversation to the user@
mailing list, so that others can benefit from the information as well.

Thanks a lot for sharing your use case. I personally believe that Flink
should support invocations like "flink run -m yarn-cluster
xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json".
There is no fundamental reason why this can not be supported.

The Javadoc about tableEnv.getConfig() mentions that the config is only
about the "runtime behavior":
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L1151
... but I see how this is not clearly defined.

As a short-term fix, I've proposed to clarify in the configuration table
which options are cluster vs job configurations:
https://issues.apache.org/jira/browse/FLINK-22257.

But in the long term, we certainly need to improve the user experience.


On Wed, Jun 16, 2021 at 3:31 PM Jason Lee  wrote:

> Dear Robert,
>
> For tasks running on the cluster, some parameter configurations are
> global, but some parameter configurations need to be customized, such as
> some memory settings of TaskManager. For tasks with different state sizes,
> we need to configure different parameters. These parameters should not  be
> configured in flink-config.yaml. But for the current Flink, these
> parameters cannot be configured through StreamExecutionEnvironment, and
> some parameters are not effective if set through StreamTableEnvironment.
>
> At the same time, Configuration is immutable after the task is started,
> which is correct, but I think some global parameters should also be
> specified in StreamExecutionEnvironment. At present, some parameters of
> checkpoint are also set globally, but they can be set through
> "StreamExecutionEnvironment .getCheckpointConfig().set()", then why can't
> the parameters of TaskManager's memory be set in this way? I think that
> setting the global parameters by "flink run -yD" is the same as setting by
> "StreamExecutionEnvironment". I am not sure if I understand it correctly.
>
> I agree with you. I think we need to specify in the configuration of the
> official document that those parameters are best configured in
> flink-config.yaml. Those parameters can be modified in
> "StreamExecutionEnvironment", and those can only be passed through others
> Modified in the way. I think the document will be clearer for users.
>
> Best,
> Jason
> JasonLee1781
> jasonlee1...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 06/16/2021 21:04,Jason Lee 
> wrote:
>
> Dear Robert,
>
> Thanks for your answer
>
> Our Flink SQL task is deployed through Per job.
>
> We provide our users with a platform for developing Flink SQL tasks. We
> will write the user's SQL code and configuration parameters into a
> Config.json file. At the same time, we develop a Flink Jar task at the
> bottom to actually execute the user's SQL through the command line. To
> perform this task, for example, the following is our instruction to start a
> Flink SQL task: "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob
> flink-stream-sql-ddl-1.0.0.jar ./config.json". In order to facilitate the
> user's personalized configuration parameters, we want to set user
> configuration parameters in the execution environment of the
> FlinkStreamSQLDDLJob class that we have implemented, such as the
> "taskmanager.memory.managed.fraction" parameter, but it is currently
> impossible to configure through the Flink execution environment These
> parameters, because they are not effective, can only be configured by flink
> run -yD.
>
> I think the configuration in the official document states that those
> parameters cannot be set through
> "StreamTableEnvironment.getConfig.getConfiguration().set()", but can only
> be set through flink run -yD or configured in flink-conf.yaml. If the
> current document does not explain it, it will not take effect if you use
> the "StreamTableEnvironment.getConfig.getConfiguration().set()" method to
> set some parameters. In order to increase the use of personalized
> configuration parameters for users, I think these instructions can appear
> in the Configuration of the official document.
>
> Best,
> Jason
>
> JasonLee1781
> jasonlee1...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 06/16/2021 18:37,Robert Metzger
>  wrote:
>
> Hi Jason,
>
> How are you deploying your Flink SQL tasks? (are you using
> 

flink 1.11.2 pyudf python worker 内存怎么限制呢?

2021-06-17 Thread Peihui He
Hi, all

使用python写的udf,里面封装了模型的预测,但是在提交sqljob到flink session的时候,总是被容器kill。
taskmanager 命令行参数:
sun.java.command = org.apache.flink.runtime.taskexecutor.TaskManagerRunner
-Djobmanager.rpc.address=10.50.56.253 --configDir /opt/flink-1.11.2/conf -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=321011060b -D
taskmanager.memory.network.min=321011060b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=321011060b -D taskmanager.cpu.cores=4.0 -D
taskmanager.memory.task.heap.size=2299652985b -D
taskmanager.memory.task.off-heap.size=0b

[image: image.png]

python.fn-execution.framework.memory.size 450mb

配置了如上参数,但是没有效果。

是什么原因呢?


Re: 退订

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:16 PM 金晓龙  wrote:

> 退订



-- 
Best, Jingsong Lee


Re: 退訂

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:51 AM Chongaih Hau 
wrote:

> 郵箱更換,退訂
>
> Regards,
> Hau ChongAih
>


-- 
Best, Jingsong Lee


Re: 邮件退订

2021-06-17 Thread Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Thu, Jun 17, 2021 at 9:29 AM wangweigu...@stevegame.cn <
wangweigu...@stevegame.cn> wrote:

>
> 邮箱变更,退订!
>
>
>
>

-- 
Best, Jingsong Lee


Re: Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2021-06-17 Thread yangpengyi
请问该问题有解决吗?我使用FLINK yarn-per-job方式提交到yarn集群也出现了这个错误



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-17 Thread Arvid Heise
Hi Chirag,

Which Flink version are you using? As far as I understand, the issue is
appearing just by writing the initial data - no recovery happened right?

Could you try to change the code such that you only have a single
read/update on the state? It should work as you have done it but I'd like
to pinpoint the issue further.

On Thu, Jun 10, 2021 at 8:25 AM Yun Gao  wrote:

> Hi Chirag,
>
> Logically Integer type should not have this issue. Sorry that from the
> current description I
> have not found other issues, could you also share the code in the main
> method that
> adds the KeyProcessFunction into the job ? Very thanks!
>
> Best,
> Yun
>
> --
> From:Chirag Dewan 
> Send Time:2021 Jun. 9 (Wed.) 15:15
> To:User ; Yun Gao 
> Subject:Re: Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
>
> Thanks for the reply Yun.
>
> The key is an Integer type. Do you think there can be hash collisions for
> Integers?
>
> It somehow works on single TM now. No errors for 1m records.
> But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out
> of Bound', key not in Keygroup etc.
>
> This also causes a NPE in the user defined code -
>
> if (valueState != null)
> valueState.value() -> This causes Null, so while the if check passed,
> it caused an NPE while reading the value.
>
> Thanks,
> Chirag
>
> On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao 
> wrote:
>
>
> Hi Chirag,
>
> As far as I know, If you are running a single job, I think all th pods
> share the same
> state.checkpoints.dir configuration should be as expected, and it is not
> necessary
> to configuraiton the rocksdb local dir since Flink will chosen a default
> dir.
>
> Regarding the latest exception, I think you might first check the key type
> used and
> the key type should has a stable hashcode method.
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Chirag Dewan 
> *Send Date:*Tue Jun 8 18:06:07 2021
> *Recipients:*User , Yun Gao 
> *Subject:*Re: Multiple Exceptions during Load Test in State Access APIs
> with RocksDB
> Hi,
>
> Although this looks like a problem to me, I still cant conclude it.
>
> I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each.
> I was hoping that with single TM there will be file write conflicts. But
> that doesn't seem to be the case as still get the:
>
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.IllegalArgumentException: Key group 2 is not in
> KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
>
> I have checked that there's no concurrent access on the ValueState.
>
> Any more leads?
>
> Thanks,
> Chirag
>
>
> On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Hi,
>
> I think I got my issue. Would help if someone can confirm it :)
>
> I am using a NFS filesystem for storing my checkpoints and my Flink
> cluster is running on a K8 with 2 TMs and 2 JMs.
>
> All my pods share the NFS PVC with state.checkpoint.dir and we also missed
> setting the RocksDB local dir.
>
> Does this lead to state corruption?
>
> Thanks,
> Chirag
>
>
>
> On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Thanks for the reply Yun. I strangely don't see any nulls. And infact this
> exception comes on the first few records and then job starts processing
> normally.
>
> Also, I don't see any reason for Concurrent access to the state in my
> code. Could more CPU cores than task slots to the Task Manager be the
> reason for it?
>
> On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao 
> wrote:
>
>
> Hi Chirag,
>
> If be able to produce the exception, could you first add some logs to print
> the value of valueState, valueState.value(), inEvent and
> inEvent.getPriceDelta() ?
> I think either object being null would cause NullPointerException here.
>
> For the second exception, I found a similar issue[1], caused by concurrent
> access to the value state. Do we have the similar situation here ?
>
> Best,
> Yun
>
> [1] https://issues.apache.org/jira/browse/FLINK-18587
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Chirag Dewan 
> *Send Date:*Sat Jun 5 20:29:37 2021
> *Recipients:*User 
> *Subject:*Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
> Hi,
>
> I am getting multiple exceptions while trying to use RocksDB as astate
> backend.
>
> I have 2 Task Managers with 2 taskslots and 4 cores each.
>
> Below is our setup:
>
>
>
> Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) >
> KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism)
> > KafkaTopic
>
>
>
> public class Aggregator_KeyedExpression
> extendsKeyedProcessFunction {
>
>
>
> private ValueStatevalueState;
>
>
>
> @Override
>
> public void open() throws Exception {
>
> 

Re:Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 Thread Michael Ran
本来想从DeserializationFormat 拿到的,如果不能。后续SQL 能拿到也行
在 2021-06-17 14:41:55,"Jingsong Li"  写道:
>不能,除非你自己创建一个新的kafka connector。
>
>不过,
>kafka的offset、partition等信息是可以通过metadata的方式拿到的。
>
>你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?
>
>Best,
>Jingsong
>
>On Thu, Jun 17, 2021 at 2:35 PM Michael Ran  wrote:
>
>> dear all :
>> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
>> 但是根据 “implements DeserializationFormatFactory,
>> SerializationFormatFactory”
>> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>>   有方法
>> deserialize(ConsumerRecord record,
>> Collector collector) 。
>> 包装了offset 的对象:ConsumerRecord
>>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>>
>
>
>
>-- 
>Best, Jingsong Lee


Re:Re: Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-17 Thread 东东
这俩都开启的话,就要求同一源ip的连接请求中的timstamp必须是递增的,否则(非递增)的连接请求被视为无效,数据包会被抛弃,给client端的感觉就是时不时的连接超时。



一般来说单机不会有这个问题,因为时钟应该是一个,在NAT后面才容易出现这个现象(因为多个主机时钟通常不完全一致),但不清楚你的具体架构,只能说试一试。


最后,可以跟运维讨论一下,除非确信不会有经过NAT过来的链接,否则这俩最好别都开。


PS: kernel 4.1里面已经把 tcp_tw_reuse 这玩意废掉了,因为太多人掉这坑里了


在 2021-06-17 14:07:50,"yidan zhao"  写道:
>这啥原理,这个改动我没办法直接改,需要申请。
>
>东东  于2021年6月17日周四 下午1:36写道:
>>
>>
>>
>> 把其中一个改成0
>>
>>
>> 在 2021-06-17 13:11:01,"yidan zhao"  写道:
>> >是的,宿主机IP。
>> >
>> >net.ipv4.tcp_tw_reuse = 1
>> >net.ipv4.tcp_timestamps = 1
>> >
>> >东东  于2021年6月17日周四 下午12:52写道:
>> >>
>> >> 10.35.215.18是宿主机IP?
>> >>
>> >> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
>> >> 实在不行就 tcpdump 吧
>> >>
>> >>
>> >>
>> >> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
>> >> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
>> >> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
>> >> >
>> >> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
>> >> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
>> >> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
>> >> >
>> >> >东东  于2021年6月17日周四 上午11:19写道:
>> >> >>
>> >> >> 单机standalone,还是Docker/K8s ?
>> >> >>
>> >> >>
>> >> >>
>> >> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
>> >> >> >Hi, yingjie.
>> >> >> >If the network is not stable, which config parameter I should adjust.
>> >> >> >
>> >> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
>> >> >> >>
>> >> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
>> >> >> >> 142892, so it is not bad.
>> >> >> >> 3: stream job.
>> >> >> >> 4: I will try to config taskmanager.network.retries which is default
>> >> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
>> >> >> >> is 120s。
>> >> >> >> 5: I checked the net fd number of the taskmanager, it is about 
>> >> >> >> 1000+,
>> >> >> >> so I think it is a reasonable value.
>> >> >> >>
>> >> >> >> 1: can not be sure.
>> >> >> >>
>> >> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
>> >> >> >> >
>> >> >> >> > Hi yidan,
>> >> >> >> >
>> >> >> >> > 1. Is the network stable?
>> >> >> >> > 2. Is there any GC problem?
>> >> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for 
>> >> >> >> > more information.
>> >> >> >> > 4. You may try to config these two options: 
>> >> >> >> > taskmanager.network.retries, 
>> >> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
>> >> >> >> > options can be found in 'Data Transport Network Stack' section of 
>> >> >> >> > [2].
>> >> >> >> > 5. If it is not the above cases, it is may related to [3], you 
>> >> >> >> > may need to check the number of tcp connection per TM and node.
>> >> >> >> >
>> >> >> >> > Hope this helps.
>> >> >> >> >
>> >> >> >> > [1] 
>> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>> >> >> >> > [2] 
>> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
>> >> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
>> >> >> >> >
>> >> >> >> > Best,
>> >> >> >> > Yingjie
>> >> >> >> >
>> >> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
>> >> >> >> >>
>> >> >> >> >> Attachment is the exception stack from flink's web-ui. Does 
>> >> >> >> >> anyone
>> >> >> >> >> have also met this problem?
>> >> >> >> >>
>> >> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 
>> >> >> >> >> containers,
>> >> >> >> >> each 28G mem.


Re: 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 Thread Jingsong Li
不能,除非你自己创建一个新的kafka connector。

不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。

你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?

Best,
Jingsong

On Thu, Jun 17, 2021 at 2:35 PM Michael Ran  wrote:

> dear all :
> 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
> 但是根据 “implements DeserializationFormatFactory,
> SerializationFormatFactory”
> 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
>   有方法
> deserialize(ConsumerRecord record,
> Collector collector) 。
> 包装了offset 的对象:ConsumerRecord
>  ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
>



-- 
Best, Jingsong Lee


如何使用kafka的KafkaDynamicTableFactory ,替换他的json format

2021-06-17 Thread Michael Ran
dear all :
目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
但是根据 “implements DeserializationFormatFactory, 
SerializationFormatFactory” 
这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
有方法 
deserialize(ConsumerRecord record, Collector 
collector) 。
包装了offset 的对象:ConsumerRecord   
,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
  

Re: Diagnosing bottlenecks in Flink jobs

2021-06-17 Thread JING ZHANG
Hi Dan,
It's better to split the Kafka partition into multiple partitions.
Here is a way to try without splitting the Kafka partition. Add a rebalance
shuffle between source and the downstream operators, set multiple
parallelism for the downstream operators. But this way would introduce
extra cpu cost for serialize/deserialize and extra network cost for shuffle
data. I'm not sure the benefits of this method can offset the additional
costs.

Best,
JING ZHANG

Dan Hill  于2021年6月17日周四 下午1:49写道:

> Thanks, JING ZHANG!
>
> I have one subtask for one Kafka source that is getting backpressure.  Is
> there an easy way to split a single Kafka partition into multiple
> subtasks?  Or do I need to split the Kafka partition?
>
> On Wed, Jun 16, 2021 at 10:29 PM JING ZHANG  wrote:
>
>> Hi Dan,
>> Would you please describe what's the problem about your job? High latency
>> or low throughput?
>> Please first check the job throughput and latency .
>> If the job throughput matches the speed of sources producing data and the
>> latency metric is good, maybe the job works well without bottlenecks.
>> If you find unnormal throughput or latency, please try the following
>> points:
>> 1. check the back pressure
>> 2. check whether checkpoint duration is long and whether the checkpoint
>> size is expected
>>
>> Please share the details for deeper analysis in this email if you find
>> something abnormal about  the job.
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill  于2021年6月17日周四 下午12:44写道:
>>
>>> We have a job that has been running but none of the AWS resource metrics
>>> for the EKS, EC2, MSK and EBS show any bottlenecks.  I have multiple 8
>>> cores allocated but only ~2 cores are used.  Most of the memory is not
>>> consumed.  MSK does not show much use.  EBS metrics look mostly idle.  I
>>> assumed I'd be able to see whichever resources is a bottleneck.
>>>
>>> Is there a good way to diagnose where the bottleneck is for a Flink job?
>>>
>>


Re: Resource Planning

2021-06-17 Thread Robert Metzger
Hi,

since your state (150gb) seems to fit into memory (700gb), I would
recommend trying the HashMapStateBackend:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend
(unless you know that your state size is going to increase a lot soon).
But I guess you'll have a nice performance improvement.

At the moment I have no idea where else to look for the issue you are
describing, but it seems that there are a few things for you to try out to
optimize the resource allocation.

On Wed, Jun 16, 2021 at 7:23 PM Rommel Holmes 
wrote:

> Hi, Xintong and Robert
>
> Thanks for the reply.
>
> The checkpoint size for our job is 10-20GB since we are doing incremental
> checkpointing, if we do a savepoint, it can be as big as 150GB.
>
> 1) We will try to make Flink instance bigger.
> 2) Thanks for the pointer, we will take a look.
>
> 3) We do have CPU and memory monitoring, when it is backpressure, the CPU
> load increases from 25% to 50% with more spiky shape, but it is not 100%.
> As for memory, we monitored (Heap.Committed - Heap.Used) per host, when
> backpressure happened, the memory on host is still 500MB ish.
>
> What we observed is that when backpressure happened, the read state time
> slowness happened on one of the hosts, and on different task managers on
> this host. The read state time (one metrics we create and measure) on that
> host shoots up, from 0.x ms to 40-60 ms.
>
> We also observed that when this happens, the running compaction time for
> RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other
> hosts are still 1minute ish.
>
> We also observed that when this happens, size of the active and unflushed
> immutable memtables metrics increased not as fast as before the
> backpressure.
>
> I can provide more context if you are interested. We are still debugging
> on this issue.
>
> Rommel
>
>
>
>
>
> On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger 
> wrote:
>
>> Hi Thomas,
>>
>> My gut feeling is that you can use the available resources more
>> efficiently.
>>
>> What's the size of a checkpoint for your job (you can see that from the
>> UI)?
>>
>> Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory
>> available, you might be able to do everything in memory (I might be off by
>> a few terabytes here, it all depends on your state size ;) )
>>
>> 1. In my experience, it is usually more efficient to have a few large
>> Flink instances than many small ones. Maybe try to run 12 TaskManagers (or
>> 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb)
>> and see how Flink behaves.
>>
>> 2. I'd say it's a try and see process, with a few educated guesses. Maybe
>> check out this:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>> to get some inspiration for making some "back of the napkin" calculations
>> on the sizing requirements.
>>
>> 3. Do you have some monitoring of CPU / memory / network usage in place?
>> It would be interesting to see what the mentrics look like when
>> everything is ok vs when the job is backpressured.
>>
>> Best,
>> Robert
>>
>>
>> On Wed, Jun 16, 2021 at 3:56 AM Xintong Song 
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> It would be helpful if you can provide the jobmanager/taskmanager logs,
>>> and gc logs if possible.
>>>
>>> Additionally, you may consider to monitor the cpu/memory related metrics
>>> [1], see if there's anything abnormal when the problem is observed.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
>>>
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang  wrote:
>>>
 Hi,

 I'm trying to see if we have been given enough resources (i.e. CPU and
 memory) to each task node to perform a deduplication job. Currently, the
 job is not running very stable. What I have been observing is that after a
 couple of days run, we will suddenly see backpressure happen on one
 arbitrary ec2 instance in the cluster and when that happens, we will have
 to give up the current state and restart the job with an empty state. We
 can no longer take savepoint as it would timeout after 10 minutes, which is
 understandable.

 Additional Observations

 When the backpressure happens, we see an increase in our state read
 time (we are measuring it using a custom metric) from about 0.1
 milliseconds to 40-60 milliseconds on that specific problematic ec2
 instance. We tried to reboot that ec2 instance, so that the corresponding
 tasks would be assigned to a different ec2 instance, but the problem
 persists.

 However, I’m not sure if this read time increase is a symptom or the
 cause of the problem.

 Background about this deduplication job:

 We are making sessionization with deduplication on an event stream by a
 session key that is embedded 

退订

2021-06-17 Thread 金晓龙
退订

Re: RocksDB CPU resource usage

2021-06-17 Thread Robert Metzger
If you are able to execute your job locally as well (with enough data), you
can also run it with a profiler and see the CPU cycles spent on
serialization (you can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:

> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
>> Depending on the datatypes you are using, seeing 3x more CPU usage seems
>> realistic.
>> Serialization can be quite expensive. See also:
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> Maybe it makes sense to optimize there a bit.
>>
>> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>>
>>> Hi Padarn,
>>> After switch stateBackend from filesystem to rocksdb, all reads/writes
>>> from/to backend have to go through de-/serialization to retrieve/store the
>>> state objects, this may cause more cpu cost.
>>> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
>>> To find out the reason, we need more profile on CPU cost, such as Flame
>>> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
>>> in Flink[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>>>
 Hi all,

 We have a job that we just enabled rocksdb on (instead of file
 backend), and see that the CPU usage is almost 3x greater on (we had to
 increase taskmanagers 3x to get it to run.

 I don't really understand this, is there something we can look at to
 understand why CPU use is so high? Our state mostly consists of aggregation
 windows.

 Cheers,
 Padarn

>>>


Re: Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-17 Thread yidan zhao
这啥原理,这个改动我没办法直接改,需要申请。

东东  于2021年6月17日周四 下午1:36写道:
>
>
>
> 把其中一个改成0
>
>
> 在 2021-06-17 13:11:01,"yidan zhao"  写道:
> >是的,宿主机IP。
> >
> >net.ipv4.tcp_tw_reuse = 1
> >net.ipv4.tcp_timestamps = 1
> >
> >东东  于2021年6月17日周四 下午12:52写道:
> >>
> >> 10.35.215.18是宿主机IP?
> >>
> >> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
> >> 实在不行就 tcpdump 吧
> >>
> >>
> >>
> >> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
> >> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
> >> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
> >> >
> >> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
> >> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
> >> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
> >> >
> >> >东东  于2021年6月17日周四 上午11:19写道:
> >> >>
> >> >> 单机standalone,还是Docker/K8s ?
> >> >>
> >> >>
> >> >>
> >> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >> >> >Hi, yingjie.
> >> >> >If the network is not stable, which config parameter I should adjust.
> >> >> >
> >> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >> >> >>
> >> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> >> >> 142892, so it is not bad.
> >> >> >> 3: stream job.
> >> >> >> 4: I will try to config taskmanager.network.retries which is default
> >> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> >> >> >> is 120s。
> >> >> >> 5: I checked the net fd number of the taskmanager, it is about 1000+,
> >> >> >> so I think it is a reasonable value.
> >> >> >>
> >> >> >> 1: can not be sure.
> >> >> >>
> >> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >> >> >
> >> >> >> > Hi yidan,
> >> >> >> >
> >> >> >> > 1. Is the network stable?
> >> >> >> > 2. Is there any GC problem?
> >> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for 
> >> >> >> > more information.
> >> >> >> > 4. You may try to config these two options: 
> >> >> >> > taskmanager.network.retries, 
> >> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
> >> >> >> > options can be found in 'Data Transport Network Stack' section of 
> >> >> >> > [2].
> >> >> >> > 5. If it is not the above cases, it is may related to [3], you may 
> >> >> >> > need to check the number of tcp connection per TM and node.
> >> >> >> >
> >> >> >> > Hope this helps.
> >> >> >> >
> >> >> >> > [1] 
> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> >> >> > [2] 
> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >> >> >
> >> >> >> > Best,
> >> >> >> > Yingjie
> >> >> >> >
> >> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >> >> >>
> >> >> >> >> Attachment is the exception stack from flink's web-ui. Does anyone
> >> >> >> >> have also met this problem?
> >> >> >> >>
> >> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 
> >> >> >> >> containers,
> >> >> >> >> each 28G mem.


Re: Re: Re: Upgrade job topology in checkpoint

2021-06-17 Thread Padarn Wilson
Thanks Yun,

Agreed, it seemed unlikely to be state, I just wanted to confirm that this
was unexpected before ruling it out.

Thanks,
Padarn

On Thu, Jun 17, 2021 at 10:45 AM Yun Gao  wrote:

> Hi Padarn,
>
> From the current description it seems to me that the issue does not
> related to
> the state ? I think we may first check if the operator logic is right and
> whether
> the precedent tasks have indeed emitted records to the new sink.
>
> Best,
> Yun
>
> --Original Mail --
> *Sender:*Padarn Wilson 
> *Send Date:*Wed Jun 16 12:27:43 2021
> *Recipients:*Yun Gao , user 
> *Subject:*Re: Re: Upgrade job topology in checkpoint
>
>> We added a new sink to the job graph and redeployed - but the new sink
>> did not receive any records, as though it were not connected to the graph
>> (possible it was a code bug, but I was trying to understand if this make
>> sense given the implementation)
>>
>> re-including mailing list, excluded by accident
>>
>> Padarn
>>
>> On Wed, Jun 16, 2021 at 10:59 AM Yun Gao  wrote:
>>
>>> Hi Padarn,
>>>
>>> Sorry I might not fully got the mean of new topology was ignored.
>>> Do you mean the topology is not the same as expected ?
>>>
>>> Best,
>>> Yun
>>>
>>>
>>> --Original Mail --
>>> *Sender:*Padarn Wilson 
>>> *Send Date:*Tue Jun 15 21:45:17 2021
>>> *Recipients:*Yun Gao 
>>> *Subject:*Re: Upgrade job topology in checkpoint
>>>
 Thanks Yun,

 Yes we do indeed retain checkpoints, but we were unable to restore with
 new topology from them for some reason. It seemed like the new topology was
 ignored totally which was surprising to me.

 Padarn

 On Tue, Jun 15, 2021 at 7:35 PM Yun Gao  wrote:

> Hi Padarn,
>
> By default the checkpoint would be disposed when the job finished or
> failed,
> they would be retained only when explicitly required [1].
>
> From the implementation perspective I think users could be able to
> change topology when restored
> from external checkpoint, but I think Flink would not guarantee this
> functionality.
>
> Best,
> Yun
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>
> --Original Mail --
> *Sender:*Padarn Wilson 
> *Send Date:*Sat Jun 12 12:19:56 2021
> *Recipients:*user 
> *Subject:*Upgrade job topology in checkpoint
>
>> Hi all,
>>
>> I'm looking for some clarity about changing job topology as described
>> here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#application-topology
>>
>> My question is simple: Does this only apply to savepoints? Or can it
>> also work for checkpoints? (also why if not)
>>
>> Cheers,
>> Padarn
>>
>