Re: Re: flink on yarn日志问题

2020-11-03 文章 bradyMk
Hi,您好,请问如何拼接url可以看到已经结束了的任务的tm日志文件呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink on yarn日志问题

2020-11-03 文章 bradyMk
您好,请问一下,如何通过拼接url获取已经结束的任务的tm日志呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 admin
会不会是这个问题 https://issues.apache.org/jira/browse/FLINK-19151

> 2020年11月4日 下午2:42,酷酷的浑蛋  写道:
> 
> taskmanager.memory.process.size: 
> 1728m1728改为2048就好了,这是啥原理taskmanager.memory.process.size: 2048m
> 
> 
> 
> 在2020年11月4日 11:47,Yangze Guo 写道:
> 有更完整的am日志么?需要看一下rm那边资源申请情况。
> 
> Best,
> Yangze Guo
> 
> On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋  wrote:
> 
> 
> 
> 下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate the required slot within slot request timeout. Please make 
> sure that the cluster has enough resources.
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
>  ~[release-s
> Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
> atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
> ...25more
> Causedby:java.util.concurrent.TimeoutException
> ...23more
> 在2020年11月4日 11:20,Guowei Ma 写道:
> hi,
> 有看过am的日志没有,日志中有报什么异常么?
> 
> Best,
> Guowei
> 
> 
> On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:
> 
> 
> flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
> 资源已经分配
> 
> 



Re: Re: 不同的算子能共用一个状态吗?

2020-11-03 文章 hl9...@126.com
感谢。我这个场景中op1和op2是串行的,那只能把op1的状态也发到下游的op2。



hl9...@126.com
 
发件人: Qi Kang
发送时间: 2020-11-04 14:53
收件人: user-zh
主题: Re: 不同的算子能共用一个状态吗?
Hi,
 
Flink不支持算子共享状态。如果你的op1和op2是并行的,就只能利用外部存储间接地共享状态数据。如果是串行的(op2在op1的下游),也可以尝试考虑把op1产生的状态数据作为流元素发送到op2中去。希望对你有所帮助。
 
 
> On Nov 4, 2020, at 14:48, hl9...@126.com wrote:
> 
> Hi,all:
> 我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗?
> 我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。
> 
> 
> 
> hl9...@126.com
 


Re: 不同的算子能共用一个状态吗?

2020-11-03 文章 Qi Kang
Hi,

Flink不支持算子共享状态。如果你的op1和op2是并行的,就只能利用外部存储间接地共享状态数据。如果是串行的(op2在op1的下游),也可以尝试考虑把op1产生的状态数据作为流元素发送到op2中去。希望对你有所帮助。


> On Nov 4, 2020, at 14:48, hl9...@126.com wrote:
> 
> Hi,all:
> 我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗?
> 我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。
> 
> 
> 
> hl9...@126.com



不同的算子能共用一个状态吗?

2020-11-03 文章 hl9...@126.com
Hi,all:
我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗?
我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。



hl9...@126.com


回复: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 酷酷的浑蛋
taskmanager.memory.process.size: 
1728m1728改为2048就好了,这是啥原理taskmanager.memory.process.size: 2048m



在2020年11月4日 11:47,Yangze Guo 写道:
有更完整的am日志么?需要看一下rm那边资源申请情况。

Best,
Yangze Guo

On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋  wrote:



下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources.
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ~[1.jar:?]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
 ~[1.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
 ~[1.jar:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
 ~[1.jar:?]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
 ~[1.jar:?]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
 ~[1.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
 ~[1.jar:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
 ~[release-s
Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
...25more
Causedby:java.util.concurrent.TimeoutException
...23more
在2020年11月4日 11:20,Guowei Ma 写道:
hi,
有看过am的日志没有,日志中有报什么异常么?

Best,
Guowei


On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:


flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
资源已经分配




Re: 使用BroadcastStream后checkpoint失效

2020-11-03 文章 restart
感谢,改成定时确实流状态时RUNNING了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用BroadcastStream后checkpoint失效

2020-11-03 文章 restart
finish状态导致checkpoint不触发,我看了源码,在CheckpointCoordinator.triggerCheckpoint方法有判断:

 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-11-03 文章 Congxian Qiu
Hi
这个问题看上去是特定 JDK 版本上,某些写法下对象被提前回收了,猜测和 gc 有关。之前看到一个可能相关的帖子[1]

[1] https://cloud.tencent.com/developer/news/564780

Best,
Congxian


蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年11月4日周三 上午11:33写道:

> hi,这个问题我也遇到了,这个问题的根本原因是啥呢?
>
>
>
> --原始邮件--
> 发件人: "chenkaibit" 发送时间: 2020年5月9日(星期六) 中午12:09
> 收件人: "user-zh" 主题: Re:Re:Re: flink-1.10 checkpoint 偶尔报 NullPointerException
>
>
>
> Hi:
> 加了一些日志后发现是checkpointMetaData为NULL了
> https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421
> 测试程序为读kafka,然后进行wordcount,结果写入kafka。checkpoint配置如下:
> |CheckpointingMode|ExactlyOnce|
> |Interval|5s|
> |Timeout|10m0s|
>
> |MinimumPauseBetweenCheckpoints|0ms|
> |MaximumConcurrentCheckpoints|1|
>
>
> 稳定在第5377个checkpoint抛出NPE
>
>
> 虽然原因还不清楚,但是修改了部分代码(见
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> )后不再出现NPE了。
>
>
> 在2020-04-2110:21:56,chenkaibit<
> chenkai...@163.com写道:
> 
> 
> 
> 这个不是稳定复现的,但是在最近1.10上测试的几个作业出现了,触发时也没有其他报错。我加了一些日志,再观察下
> 
> 
> 
> 
> 在2020-04-2101:12:48,YunTang<
> myas...@live.com写道:
> Hi
> 
> 这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。
>
> 一种排查思路是打开org.apache.flink.streaming.runtime.tasks的DEBUGlevel日志,通过debug日志缩小范围,判断哪个变量是null
> 
> 这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么?
> 
> [1]
> https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349
> 
> 
> ;
> 祝好
> 唐云
> 
> 
> From:chenkaibit Sent:Monday,April20,202018:39
> To:user-zh@flink.apache.org 
>
> Subject:flink-1.10checkpoint偶尔报NullPointerException
> 
>
> 大家遇到过这个错误吗,CheckpointOperation.executeCheckpointing的时候报NullPointerException
>
> java.lang.Exception:Couldnotperformcheckpoint5505foroperatorSource:KafkaTableSource(xxx)-SourceConversion(table=[xxx,source:[KafkaTableSource(xxx)]],fields=[xxx])-Calc(select=[xxx)ASxxx])-SinkConversionToTuple2-Sink:Elasticsearch6UpsertTableSink(xxx)(1/1).
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$228/1024478318.call(UnknownSource)
> 
>
> atjava.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> 
>
> atorg.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> 
>
> atorg.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> 
>
> atjava.lang.Thread.run(Thread.java:745)
> 
> Causedby:java.lang.NullPointerException
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$229/1010499540.run(UnknownSource)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
> 
>
> atorg.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
> 
> ...12more


Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 Yangze Guo
有更完整的am日志么?需要看一下rm那边资源申请情况。

Best,
Yangze Guo

On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋  wrote:
>
>
>
> 下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate the required slot within slot request timeout. Please make 
> sure that the cluster has enough resources.
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
>  ~[release-s
> Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
> atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
> ...25more
> Causedby:java.util.concurrent.TimeoutException
> ...23more
> 在2020年11月4日 11:20,Guowei Ma 写道:
> hi,
> 有看过am的日志没有,日志中有报什么异常么?
>
> Best,
> Guowei
>
>
> On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:
>
>
> flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
> 资源已经分配
>
>


回复: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 酷酷的浑蛋


下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources.
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ~[1.jar:?]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
 ~[1.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
 ~[1.jar:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
 ~[1.jar:?]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
 ~[1.jar:?]
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
 ~[1.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
 ~[1.jar:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 ~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_191]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_191]
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
 ~[release-s
Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
...25more
Causedby:java.util.concurrent.TimeoutException
...23more
在2020年11月4日 11:20,Guowei Ma 写道:
hi,
有看过am的日志没有,日志中有报什么异常么?

Best,
Guowei


On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:


flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
资源已经分配




Re: Re: Re:flink任务挂掉后自动重启

2020-11-03 文章 bradyMk
谢谢您的解答,我现在已经尝试用这种思路去搞了



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:Re: Re:flink任务挂掉后自动重启

2020-11-03 文章 bradyMk
您好,除了手动自己去kill掉任务,我这边经常会有类似丢节点或者写hbase时节点连不上导致整个job挂掉的问题,类似:
Caused by:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed
66 actions: Operation Timeout: 66 times, servers with issues:
172.xx.x.xx,16020,1597989428451
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:297)
at
org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2300(AsyncProcess.java:273)
at
org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1906)
...
针对这种情况,应该是不能自动拉起的,这种情况,job被kill掉,需要重新提交任务就好了,所以我现在才想需要一个任务自动重启的功能



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


??????Re:Re: flink-1.10 checkpoint ?????? NullPointerException

2020-11-03 文章 ??????(Jiacheng Jiang)
hi



----
??: "chenkaibit"https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1421
kafka??wordcount??kafka??checkpoint??
|CheckpointingMode|ExactlyOnce|
|Interval|5s|
|Timeout|10m0s|
|MinimumPauseBetweenCheckpoints|0ms|
|MaximumConcurrentCheckpoints|1|


5377??checkpointNPE


https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19NPE;


??2020-04-2110:21:56??chenkaibithttps://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1349





From:chenkaibit

Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 Guowei Ma
hi,
有看过am的日志没有,日志中有报什么异常么?

Best,
Guowei


On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:

>
> flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
> 资源已经分配
>
>


flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 文章 酷酷的浑蛋


flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
资源已经分配



Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-03 文章 xiao cai
Hi wang:
非常感谢解答,我先顺着你的思路去详细了解下这个过程。
Good luck.
Best, 
xiao


 原始邮件 
发件人: hailongwang<18868816...@163.com>
收件人: user-zh
发送时间: 2020年11月3日(周二) 21:42
主题: Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题


Hi xiao, 从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 
rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 
导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 
windowBounds,所以就报了现在这个错误了。 Best, Hailong Wang 在 2020-11-03 18:27:51,"xiao cai" 
 写道: >Hi : >flink 版本 1.11.2 >问题:双流Join时,使用last_value + 
interval join,报错:Rowtime attributes must not be in the input rows of a regular 
join. As a workaround you can cast the time attributes of input tables to 
TIMESTAMP before. > > >代码: >// stream 1 >create table kafkaSource1 ( >id int, 
>field_1 int, >field_2 varchar, >ts1 timestamp(3), >watermark for `ts1` >) with 
( >connector = kafka >) >// stream 2 >create table kafkaSource2 ( >id int, 
>field_3 >ts2 timestamp(3), >watermark for `ts2` >) with ( >connector = kafka 
>) > > >//create view >create view kafkaSource1_view as >select >field_1 as 
field_1, >last_value(field_2) as field_2, >last_value(ts1) as ts1 >from 
kafkaSouce1 >group by field_1 > > >// query >insert into sinkTable >select 
>a.field_1, >b.field_3 >from kafkaSource2 a join kafkaSource1_view b >on a.id = 
b.id >and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

Re: TUMBLE函数不支持 回撤流

2020-11-03 文章 LakeShen
Hi 夜思流年梦,

看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
如果是 retract ,应该就不能再上面进行窗口计算了。

Best,
LakeShen

史 正超  于2020年11月3日周二 下午6:34写道:

> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
>return ChangelogMode.newBuilder()
>   .addContainedKind(RowKind.INSERT)
>   .addContainedKind(RowKind.UPDATE_BEFORE)
>   .addContainedKind(RowKind.UPDATE_AFTER)
>   .addContainedKind(RowKind.DELETE)
>   .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> 
> 发件人: 夜思流年梦 
> 发送时间: 2020年11月3日 9:46
> 收件人: user-zh@flink.apache.org 
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> '-MM-dd')  then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
>  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
>  'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XX',
>   'topic' = 'ODS_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XXX',
>   'topic' = 'DWD_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦  写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from 
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>


Re: Re: Re:flink任务挂掉后自动重启

2020-11-03 文章 LakeShen
Hi bradyMk,

整体上有两种方法:
   1. 任务被 Kill 掉后,拉起时,从checkpoint 恢复,这个就需要知道任务结束之前,最新一次的 checkpoint
信息,然后从这开始恢复。
   Flink 任务 checkpoint 的路径是 checkpoint 根路径 + job_id 组成的路径,所以你可以从这个目录找到
chk-xx 最新的 checkpoint ,然后进行恢复即可。
   当然,还有其他方法知道任务被kill 前,最新一次的 checkpoint 信息。你选择一种适合你的方式就行,然后做成自动化的,降低运维成本。

   2. 任务被 kill 掉后,从 Savepoint 恢复,可以定时做一次 savepoint
,不过这种方法由于状态和当前时间有间隙,可能会导致数据重放,从而下游可能会有消息重复。

  建议选择 1 来实现。

Best,
LakeShen



hailongwang <18868816...@163.com> 于2020年11月3日周二 下午7:21写道:

> Hi bradyMk,
>
>
> 在 on yarn 的模式下,如果某个container 被kill 了,是会重新拉起的。
> 至于整个 job 被kill 了,这种情况应该是自己手动显示的去停止把?
> 最于重启的话,重启次数可以设置个非常大的数字(~无限重启),但是一旦 job 一直这么重启,我个人任务就算重新拉起也是没用的把?
>   这个时候应该结合平台的告警策略来进行人工干预了。
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-11-03 09:32:50,"bradyMk"  写道:
> >您好,你说的这个策略是失败重启策略,但是如果job在某些情况被kill掉,或者重启超过重启次数,任务也会退出;我主要是针对这种情况重启的
> >
> >
> >
> >-
> >Best Wishes
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:kafka table connector eventTime的问题

2020-11-03 文章 hailongwang
Hi marble,
 使用 Datastream 开发的话,Kafka connector 
的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。
对应的中文文档对应在文献3和4.
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/connectors/kafka.html
[4] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/event_timestamps_watermarks.html


Best,
Hailong Wang

在 2020-11-03 09:12:42,"marble.zh...@coinflex.com.INVALID" 
 写道:
>你好。
>在用kafka table connector时如果使用eventTime,需要怎么启用这个eventTime, 没有找到一些相应的sample,
>
>我是这样用的,
>1. 设置Stream环境setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>2. 在connector里指定watermark,其中transTime是消息里的字段
>"rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(transTime / 1000, '-MM-dd
>HH:mm:ss')), \n " +
>"WATERMARK FOR rowtime AS rowtime - INTERVAL '10' SECOND \n" +
>
>3. 然后直接用datastream的window
>ds.keyBy(marketCode).timeWindow(Time.minutes(1L)); 
>
>但在运行时会报以下exception, 已经 在connector里定义了,还需要assign吗?
>java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>
>即使我在datastream里定义了strategy ,
>ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)));
>
>也还是报上面一样的错。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-03 文章 hailongwang
Hi xiao,


从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。
应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 
导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 
windowBounds,所以就报了现在这个错误了。


Best,
Hailong Wang

在 2020-11-03 18:27:51,"xiao cai"  写道:
>Hi :
>flink 版本 1.11.2
>问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
>the input rows of a regular join. As a workaround you can cast the time 
>attributes of input tables to TIMESTAMP before.
>
>
>代码:
>// stream 1
>create table kafkaSource1 (
>id int,
>field_1 int,
>field_2 varchar,
>ts1 timestamp(3),
>watermark for `ts1` 
>) with (
>connector = kafka
>)
>// stream 2
>create table kafkaSource2 (
>id int,
>field_3
>ts2 timestamp(3),
>watermark for `ts2` 
>) with (
>connector = kafka
>)
>
>
>//create view
>create view kafkaSource1_view as 
>select 
>field_1 as field_1,
>last_value(field_2) as field_2,
>last_value(ts1) as ts1
>from kafkaSouce1 
>group by field_1
>
>
>// query 
>insert into sinkTable 
>select 
>a.field_1,
>b.field_3
>from kafkaSource2 a join kafkaSource1_view b
>on a.id = b.id
>and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY


Re: 退订

2020-11-03 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

夏明  于2020年11月3日周二 下午8:01写道:

> 退订


Re: 使用BroadcastStream后checkpoint失效

2020-11-03 文章 JasonLee
hi
checkpoint目前只能用在流数据上,你读取的mongo数据是一个有界的数据源,所以是不支持做checkpoint就会导致整个任务的checkpoint失败,你可以把读取mongo做一个定时读取,这样应该就可以完成checkpoint.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 使用BroadcastStream后checkpoint失效

2020-11-03 文章 赵一旦
finish为啥会导致ckpt不触发。

restart  于2020年11月3日周二 上午10:11写道:

> 问题:job在接入广播流后,checkpint失效。
>
> 描述:广播流的数据来源两个地方,一个是从mongo,一个是从kafka,数据进行union,同时指定Watermark,返回Watermark.MAX_WATERMARK(用于与主数据源connect后,窗口聚合水印更新),job部署后,来源mongo的数据源状态会变为FINISHED。网上有查过,说subtask
> 状态finished会导致checkpoint不触发,那如何既能满足数据源自定义(更像是DataSet),同时保证checkpoint正常触发呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2020-11-03 文章 夏明
退订

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-03 文章 xiao cai
Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1` 
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2` 
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as 
select 
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1 
group by field_1


// query 
insert into sinkTable 
select 
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

Re:Re: Re:flink任务挂掉后自动重启

2020-11-03 文章 hailongwang
Hi bradyMk,


在 on yarn 的模式下,如果某个container 被kill 了,是会重新拉起的。
至于整个 job 被kill 了,这种情况应该是自己手动显示的去停止把?
最于重启的话,重启次数可以设置个非常大的数字(~无限重启),但是一旦 job 一直这么重启,我个人任务就算重新拉起也是没用的把?
  这个时候应该结合平台的告警策略来进行人工干预了。
Best,
Hailong Wang




在 2020-11-03 09:32:50,"bradyMk"  写道:
>您好,你说的这个策略是失败重启策略,但是如果job在某些情况被kill掉,或者重启超过重启次数,任务也会退出;我主要是针对这种情况重启的
>
>
>
>-
>Best Wishes
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-11-03 文章 Xingbo Huang
Hi,

1. python demo.py运行正常,说明代码和你本地执行的python环境都没有问题。
2. 你现在给的错误和你前一封邮件给的那个编译报错不是一个报错,一个一个问题来。
3.
你这个运行报错是你集群运行环境里面的python环境的beam包冲突了,就是我一开始回答的,你去得去检查集群运行环境里面的python环境是否符合要求。

Best,
Xingbo

jing  于2020年11月3日周二 下午6:09写道:

> 1. 重新安装之后并没有解决。
> 本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。
> 2. 作业是已经提交了。
> 有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c
> 然后立马会出现下面的错误:
>
> 即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.options.PipelineOptionsFactory
>
>
>
> Traceback (most recent call last):
>   File "docs/examples/udf/demo.py", line 37, in 
> word_count()
>   File "docs/examples/udf/demo.py", line 32, in word_count
> t_env.execute("word_count")
>   File
>
> "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
> line 1057, in execute
> return JobExecutionResult(self._j_tenv.execute(job_name))
>   File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
> 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
> return f(*a, **kw)
>   File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328,
> in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 05fcaebfec3aca731df408418ebea80c)
> at
>
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
>
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
> at
>
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
>
> org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
>
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: 05fcaebfec3aca731df408418ebea80c)
> at
>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
> at
>
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
> at
>
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
>
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
> at
>
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
>
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
>
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
>
> 

回复: TUMBLE函数不支持 回撤流

2020-11-03 文章 史 正超
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT 
UPDATE DELETE, 相关代码如下:

@Override
public ChangelogMode getChangelogMode() {
   return ChangelogMode.newBuilder()
  .addContainedKind(RowKind.INSERT)
  .addContainedKind(RowKind.UPDATE_BEFORE)
  .addContainedKind(RowKind.UPDATE_AFTER)
  .addContainedKind(RowKind.DELETE)
  .build();
}

所以在window里消费带有update和delete的数据现在应该是不支持的。

发件人: 夜思流年梦 
发送时间: 2020年11月3日 9:46
收件人: user-zh@flink.apache.org 
主题: TUMBLE函数不支持 回撤流




这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
support consuming update and delete changes which is produced by node 
TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h
>>
>>> from 
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>








Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-11-03 文章 jing
1. 重新安装之后并没有解决。
本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。
2. 作业是已经提交了。
有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c
然后立马会出现下面的错误:

即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.options.PipelineOptionsFactory



Traceback (most recent call last):
  File "docs/examples/udf/demo.py", line 37, in 
word_count()
  File "docs/examples/udf/demo.py", line 32, in word_count
t_env.execute("word_count")
  File
"/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
  File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
  File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328,
in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute.
: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: 05fcaebfec3aca731df408418ebea80c)
at
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: 05fcaebfec3aca731df408418ebea80c)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at

Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-11-03 文章 Xingbo Huang
Hi,
1. 之前那个报错在重新安装完pyflink之后有没有解决(本地python
demo.py是否正常);之前那个报错是本地运行就报错,还是在远程提交才报的错。
2. 现在这个报错是作业提交时编译阶段就报错了,还没到作业运行。在作业提交的console界面是可以看到错误日志的,可否提供一下错误日志。

Best,
Xingbo

jing  于2020年11月3日周二 下午5:36写道:

> Hi, xingbo.
> 在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install
> apache-beam==2.19.0 还是没有,都是一样的问题。
> 用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。
> 只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。
> pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。
>
> 示例代码如下:
>
> import logging
> import sys
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
> DataTypes
> from pyflink.table.udf import udf
>
>
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under
> one
> " \
>   "line or more contributor license agreements See the NOTICE
> file " \
>   "line distributed with this work for additional information "
> \
>   "line regarding copyright ownership The ASF licenses this
> file
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_env = StreamTableEnvironment.create(
> StreamExecutionEnvironment.get_execution_environment(),
>
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
> )
>
>
> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
> True)
> sink_ddl = """
> create table Results(word VARCHAR,`count` BIGINT) with (
> 'connector'
> = 'print')
> """
> add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT())
> t_env.register_function("add_test", add)
> t_env.sql_update(sink_ddl)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
> .group_by("word") \
> .select("word, add_test(count(1)) as count") \
> .insert_into("Results")
> t_env.execute("word_count")
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO,
> format="%(message)s")
> word_count()
>
>
> 环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有
> udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。
>
> 这个情况下需要装什么东西吗,还是需要改配置。
>
> 日志上提示是:
>
> 2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver
>
> [] - Run python process failed
> java.lang.RuntimeException: Python process exits with code: 1
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> 2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend
>
> [] - Fatal error while running command line interface.
> org.apache.flink.client.program.ProgramAbortException: null
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> ~[flink-python_2.11-1.11.1.jar:1.11.1]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
>
> 

TUMBLE函数不支持 回撤流

2020-11-03 文章 夜思流年梦



这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql 

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd') then 
memberid else NULL end) as paynum_h 

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')  
then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
support consuming update and delete changes which is produced by node 
TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <17626017...@163.com> 写道:
>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦  写道:
>> 
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select 
>> 
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>> 
>>> ,sum(amt) as paymoney_h  
>> 
>>> from 
>> 
>>> group by TUMBLE(write_time,interval '1' HOUR);
>> 
>> 
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
>> support consuming update and delete changes which is produced by node 
>> TableSourceScan
>> 
>> 
>> 
>> 
>> 发现把kafka建表语句改成 json格式就可以
>> 
>> 
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 





 

RT,如何动态提交管理Flink Job

2020-11-03 文章 林影
正如Apache Livy可以通过RestAPI 提交job一样,Flink有类似的方式吗?
我们的场景现在是根据业务条件可以转化成Flink SQL,想使用和livy类似的方式动态提交Job,社区有人实践过吗


Re: 【PyFlink】对于数据以Csv()格式写入kafka报错,以及使用python udf时无法启动udf

2020-11-03 文章 jing
Hi, xingbo.
在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install
apache-beam==2.19.0 还是没有,都是一样的问题。
用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。
只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。
pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。

示例代码如下:

import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def word_count():
content = "line Licensed to the Apache Software Foundation ASF under one
" \
  "line or more contributor license agreements See the NOTICE
file " \
  "line distributed with this work for additional information "
\
  "line regarding copyright ownership The ASF licenses this file
" \
  "to you under the Apache License Version the " \
  "License you may not use this file except in compliance " \
  "with the License"
t_env = StreamTableEnvironment.create(
StreamExecutionEnvironment.get_execution_environment(),
   
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
   
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)
sink_ddl = """
create table Results(word VARCHAR,`count` BIGINT) with ( 'connector'
= 'print')
"""
add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add_test", add)
t_env.sql_update(sink_ddl)
elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]) \
.group_by("word") \
.select("word, add_test(count(1)) as count") \
.insert_into("Results")
t_env.execute("word_count")


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
word_count()


环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有
udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。

这个情况下需要装什么东西吗,还是需要改配置。

日志上提示是:

2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver   
  
[] - Run python process failed
java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88)
~[flink-python_2.11-1.11.1.jar:1.11.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.1.jar:1.11.1]
2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend   
  
[] - Fatal error while running command line interface.
org.apache.flink.client.program.ProgramAbortException: null
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
~[flink-python_2.11-1.11.1.jar:1.11.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at

Re: yarn部署模式kerberos问题

2020-11-03 文章 Yangze Guo
你好,

请问描述中的"在客户端通过Kerberos权限认证指定用户"指的具体是什么操作?

-yD security.kerberos.login.principal=xxx -yD security.kerberos.login.keytab=xxx
这两个参数的作用是在Flink中enable
HadoopModule,这个Module利用UserGroupInformation来处理Kerberos认证。同时在Yarn部署中,会帮你把这个Keytab上传到yarn中的container里。

可以参照社区文档再看一下[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/security-kerberos.html

Best,
Yangze Guo

On Tue, Nov 3, 2020 at 4:17 PM amen...@163.com  wrote:
>
> hi everyone,
>
> 最近使用flink-1.11.1在通过per-job方式提交任务到yarn队列的时候,碰到了kerberos权限认证问题。
>
> 具体描述:在客户端通过Kerberos权限认证指定用户,提交flink任务到yarn队列,正常提交,但是当任务被yarn分配到指定节点进行执行时,根据报错信息来看,是因为需要操作hdfs(创建检查点目录和保存点目录,因为我使用FileSystem
>  StateBackend)而并没有获得操作hdfs的权限,被kerberos常规的拦截了。
>
> 所以我通过查找社区邮件了解到,使用-yD参数可以避免这个问题,但是理论上来说在客户端通过认证并成功提交到yarn之后,无论是任务提交节点还是任务执行节点,权限都应该互通吗?
>
> 这里的-yD security.kerberos.login.principal=xxx -yD 
> security.kerberos.login.keytab=xxx是纯粹为了解决这类问题而使用的吗?帮忙解惑~
>
> best,
> amenhub


yarn部署模式kerberos问题

2020-11-03 文章 amen...@163.com
hi everyone,

最近使用flink-1.11.1在通过per-job方式提交任务到yarn队列的时候,碰到了kerberos权限认证问题。

具体描述:在客户端通过Kerberos权限认证指定用户,提交flink任务到yarn队列,正常提交,但是当任务被yarn分配到指定节点进行执行时,根据报错信息来看,是因为需要操作hdfs(创建检查点目录和保存点目录,因为我使用FileSystem
 StateBackend)而并没有获得操作hdfs的权限,被kerberos常规的拦截了。

所以我通过查找社区邮件了解到,使用-yD参数可以避免这个问题,但是理论上来说在客户端通过认证并成功提交到yarn之后,无论是任务提交节点还是任务执行节点,权限都应该互通吗?

这里的-yD security.kerberos.login.principal=xxx -yD 
security.kerberos.login.keytab=xxx是纯粹为了解决这类问题而使用的吗?帮忙解惑~

best,
amenhub