Re: flink 1.10 support LONG as watermark?
Thanks a lot! Jark Wu 于2020年4月1日周三 上午1:13写道: > Hi Jing, > > I created https://issues.apache.org/jira/browse/FLINK-16889 to support > converting from BIGINT to TIMESTAMP. > > Best, > Jark > > On Mon, 30 Mar 2020 at 20:30, jingjing bai > wrote: > >> Hi jarkWu! >> >> Is there a FLIP to do so? I'm very glad to learn from idea. >> >> >> Best, >> jing >> >> Jark Wu 于2020年3月30日周一 下午6:52写道: >> >>> Hi Jingjing, >>> >>> Event time field must be a TIMESTAMP(3) type. You can convert your Long >>> type value into TIMESTAMP(3) using user-defined function. >>> I'm sorry that Flink doesn't provide built-in function for this purpose, >>> but will have one soon. >>> >>> For example: >>> CREATE TABLE myTable ( >>> log_ts bigint, >>> event_time AS my_func(log_ts), >>> WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND >>> ) WITH ( >>> ... >>> ); >>> >>> Here my_func is a UDF which converts BIGINT into TIMESTAMP(3). >>> >>> Best, >>> Jark >>> >>> On Mon, 30 Mar 2020 at 18:16, jingjing bai >>> wrote: >>> >>>> >>>> Hi: >>>> flinkers! >>>> >>>> I try to upgrade our production to 1.10V from 1.9 which is our current >>>> product version. >>>> in our case,the event_time is Long ,and we had implement this function >>>> which support long type as a watermark in our inner version, it is a >>>> different from the official version on 1.10. >>>> on 1.10 version, flink had add watermark definition and I had drop our >>>> implement. >>>> and i encounter this problem too , >>>> >>>> how can I do in new version, if is , I 'm not to migrate our inner >>>> implement to new version. >>>> >>>> >>>>
Re: flink 1.10 support LONG as watermark?
Hi jarkWu! Is there a FLIP to do so? I'm very glad to learn from idea. Best, jing Jark Wu 于2020年3月30日周一 下午6:52写道: > Hi Jingjing, > > Event time field must be a TIMESTAMP(3) type. You can convert your Long > type value into TIMESTAMP(3) using user-defined function. > I'm sorry that Flink doesn't provide built-in function for this purpose, > but will have one soon. > > For example: > CREATE TABLE myTable ( > log_ts bigint, > event_time AS my_func(log_ts), > WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND > ) WITH ( > ... > ); > > Here my_func is a UDF which converts BIGINT into TIMESTAMP(3). > > Best, > Jark > > On Mon, 30 Mar 2020 at 18:16, jingjing bai > wrote: > >> >> Hi: >> flinkers! >> >> I try to upgrade our production to 1.10V from 1.9 which is our current >> product version. >> in our case,the event_time is Long ,and we had implement this function >> which support long type as a watermark in our inner version, it is a >> different from the official version on 1.10. >> on 1.10 version, flink had add watermark definition and I had drop our >> implement. >> and i encounter this problem too , >> >> how can I do in new version, if is , I 'm not to migrate our inner >> implement to new version. >> >> >>
flink 1.10 support LONG as watermark?
Hi: flinkers! I try to upgrade our production to 1.10V from 1.9 which is our current product version. in our case,the event_time is Long ,and we had implement this function which support long type as a watermark in our inner version, it is a different from the official version on 1.10. on 1.10 version, flink had add watermark definition and I had drop our implement. and i encounter this problem too , how can I do in new version, if is , I 'm not to migrate our inner implement to new version.
Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。 Xintong Song 于2019年12月25日周三 上午10:28写道: > 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。 > > Thank you~ > > Xintong Song > > > > On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote: > > > 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下: > > > > org.apache.flink.util.FlinkException: The assigned slot > > bae00218c818157649eb9e3c533b86af_11 was removed. > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058) > > at > > > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385) > > at > > > org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847) > > at > > > org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185) > > at > > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > > at > > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) > > at > > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > at > > > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > at > > akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > at > > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > at > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at > > akka.actor.ActorCell.invoke(ActorCell.scala:495) > > at > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > at > akka.dispatch.Mailbox.run(Mailbox.scala:224) > > at > > akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >
Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
Typically , *NoClassDefFoundError* is caused by jar conflicts , it means, there are two same class and not same version jar in your classpath, I suggest you can check which jar is this class from ,and then to confirm is there more too than one jar in u classpath. if is and remove unuse jar . I hope this can help you. best
Re: CEP匹配乱序数据的问题
CEP的 sql中order by , 不会因为乱序导致不匹配。 在api中没用过,可以看看是否有对应的api qishang zhong 于2019年12月23日周一 下午9:37写道: > HI,大家好。 > > 咨询一个问题,flink-training-exercises练习的工程里面 > com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution > > Pattern completedRides = > Pattern.begin("start") > .where(new SimpleCondition() { > @Override > public boolean filter(TaxiRide ride) throws Exception { > return ride.isStart; > } > }) > .next("end") > .where(new SimpleCondition() { > @Override > public boolean filter(TaxiRide ride) throws Exception { > return !ride.isStart; > } > }); > > 现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。 > 是不是就不能匹配例子中的Pattern? > 如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗? >
Re: Flink task node shut it self off.
hi john in our experience , the checkpoint interval we set interval 1-10 minute and timeout usurally 5*interval . mostly we set 2 or 5 minute and 10 or 20timeout. it depend on u data bulk per second and which window used. John Smith 于2019年12月21日周六 上午5:26写道: > Hi, using Flink 1.8.0 > > 1st off I must say Flink resiliency is very impressive, we lost a node and > never lost one message by using checkpoints and Kafka. Thanks! > > The cluster is a self hosted cluster and we use our own zookeeper cluster. > We have... > 3 zookeepers: 4 cpu, 8GB (each) > 3 job nodes: 4 cpu, 8GB (each) > 3 task nodes: 4 cpu, 8GB (each) > The nodes also share GlusterFS for storing savepoints and checkpoints, > GlusterFS is running on the same machines. > > Yesterday a node shut itself off we the following log messages... > - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73 > :34697/user/taskmanager_0. > - Stop job leader service. > - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. > - Shutting down TaskExecutorLocalStateStoresManager. > - Shutting down BLOB cache > - Shutting down BLOB cache > - removed file cache directory > /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000 > - I/O manager removed spill file directory > /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed > - Shutting down the network environment and its components. > > Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU > load 1minute of 15. And we also got an hs_err file which sais we should > increase the memory. > > I'm attaching the logs here: > https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0 > > I wonder if my 5 second checkpointing is too much for gluster. > > Any thoughts? > > > > >
Re: flink如何动态修改窗口大小和类型?
目前一个任务中,仅支持一种窗口。 动态修改本身应该是一个伪需求 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >
Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问
不知道你的版本是什么, 在1.9中, 申明为TIMESTAMP类型的属性,需要是 格式化为-MM-DD'T'HH:mm:ss.SSS'Z' 不过你可以从外部传入13位时间戳,也可以转换成TIMESTAMP,比如DDL中定义 CREATE TABLE `t` ( ctm TIMESTAMP, ) WITH ( 'format.schema' = 'ROW' )" 。如果数据源也要定义为TIMESTAMP类型,则通过下面方式去定义外部数据源格式: DateTimeFormatter t = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE) .appendLiteral('T') .append(new DateTimeFormatterBuilder() .appendPattern("HH:mm:ss") .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) .appendPattern("'Z'") .toFormatter()) .toFormatter(); 用这个结构去格式化时间类型 Zhenghua Gao 于2019年12月16日周一 下午7:42写道: > 1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。 > 2) 请确认 time1/time2 类型是否是 TIMESTAMP > 3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的 > TIMESTAMP time1 无法被视作时间常量。 > > *Best Regards,* > *Zhenghua Gao* > > > On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote: > > > 各位大佬好: > >由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题! > > > > 然后我就改了一下: > > TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2) > > SQL validate报错! > > 然后我又改了一下: > > TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp)) > > 通过了! > > > > 我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量? > > > > >