Re: flink 1.10 support LONG as watermark?

2020-03-31 Thread jingjing bai
Thanks a lot!

Jark Wu  于2020年4月1日周三 上午1:13写道:

> Hi Jing,
>
> I created https://issues.apache.org/jira/browse/FLINK-16889 to support
> converting from BIGINT to TIMESTAMP.
>
> Best,
> Jark
>
> On Mon, 30 Mar 2020 at 20:30, jingjing bai 
> wrote:
>
>> Hi jarkWu!
>>
>> Is there a FLIP to do so?  I'm very glad to learn from idea.
>>
>>
>> Best,
>> jing
>>
>> Jark Wu  于2020年3月30日周一 下午6:52写道:
>>
>>> Hi Jingjing,
>>>
>>> Event time field must be a TIMESTAMP(3) type. You can convert your Long
>>> type value into TIMESTAMP(3) using user-defined function.
>>> I'm sorry that Flink doesn't provide built-in function for this purpose,
>>> but will have one soon.
>>>
>>> For example:
>>> CREATE TABLE myTable (
>>>  log_ts bigint,
>>>  event_time AS my_func(log_ts),
>>>  WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND
>>> ) WITH (
>>>  ...
>>> );
>>>
>>> Here my_func is a UDF which converts BIGINT into TIMESTAMP(3).
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 30 Mar 2020 at 18:16, jingjing bai 
>>> wrote:
>>>
>>>>
>>>> Hi:
>>>> flinkers!
>>>>
>>>> I try to upgrade our production to 1.10V from 1.9 which is our current
>>>> product version.
>>>> in our case,the event_time is Long ,and we had implement this function
>>>> which support long type  as a watermark in our inner version, it is a
>>>> different from the official version on 1.10.
>>>> on 1.10 version, flink had add watermark definition and I had drop our
>>>> implement.
>>>> and i encounter this problem too ,
>>>>
>>>> how can I do in new version,  if is ,  I 'm not to migrate our inner
>>>> implement to new version.
>>>>
>>>>
>>>>


Re: flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi jarkWu!

Is there a FLIP to do so?  I'm very glad to learn from idea.


Best,
jing

Jark Wu  于2020年3月30日周一 下午6:52写道:

> Hi Jingjing,
>
> Event time field must be a TIMESTAMP(3) type. You can convert your Long
> type value into TIMESTAMP(3) using user-defined function.
> I'm sorry that Flink doesn't provide built-in function for this purpose,
> but will have one soon.
>
> For example:
> CREATE TABLE myTable (
>  log_ts bigint,
>  event_time AS my_func(log_ts),
>  WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND
> ) WITH (
>  ...
> );
>
> Here my_func is a UDF which converts BIGINT into TIMESTAMP(3).
>
> Best,
> Jark
>
> On Mon, 30 Mar 2020 at 18:16, jingjing bai 
> wrote:
>
>>
>> Hi:
>> flinkers!
>>
>> I try to upgrade our production to 1.10V from 1.9 which is our current
>> product version.
>> in our case,the event_time is Long ,and we had implement this function
>> which support long type  as a watermark in our inner version, it is a
>> different from the official version on 1.10.
>> on 1.10 version, flink had add watermark definition and I had drop our
>> implement.
>> and i encounter this problem too ,
>>
>> how can I do in new version,  if is ,  I 'm not to migrate our inner
>> implement to new version.
>>
>>
>>


flink 1.10 support LONG as watermark?

2020-03-30 Thread jingjing bai
Hi:
flinkers!

I try to upgrade our production to 1.10V from 1.9 which is our current
product version.
in our case,the event_time is Long ,and we had implement this function
which support long type  as a watermark in our inner version, it is a
different from the official version on 1.10.
on 1.10 version, flink had add watermark definition and I had drop our
implement.
and i encounter this problem too ,

how can I do in new version,  if is ,  I 'm not to migrate our inner
implement to new version.


Re: The assigned slot bae00218c818157649eb9e3c533b86af_11 was removed

2019-12-25 Thread jingjing bai
tm挂掉了,可以看下是否存在checkpoint连续失败导致OOM, 或者是大数据集大窗口运算,如果数据量大也会导致这个问题。

Xintong Song  于2019年12月25日周三 上午10:28写道:

> 这个应该不是root cause,slot was removed通常是tm挂掉了导致的,需要找下对应的tm日志看下挂掉的原因。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Dec 24, 2019 at 10:06 PM hiliuxg <736742...@qq.com> wrote:
>
> > 偶尔发现,分配好的slot突然就被remove了,导致作业重启,看不出是什么原因导致?CPU和FULL GC都没有,异常信息如下:
> >
> > org.apache.flink.util.FlinkException: The assigned slot
> > bae00218c818157649eb9e3c533b86af_11 was removed.
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
> > at
> >
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
> > at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:847)
> > at
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1161)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> > at
> >
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> > at
> > akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > at
> > akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> > at
> > akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > at
> > akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > at
> > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > at
> > akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>


Re: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

2019-12-24 Thread jingjing bai
Typically , *NoClassDefFoundError*  is caused   by jar conflicts ,  it
means, there are two same class and not same version jar in your
classpath,
I suggest you can check which jar  is this class from ,and then to confirm
is there more too than one jar in u classpath.  if is and remove unuse jar .

I hope this can help you.

best


Re: CEP匹配乱序数据的问题

2019-12-24 Thread jingjing bai
CEP的 sql中order by ,  不会因为乱序导致不匹配。
在api中没用过,可以看看是否有对应的api

qishang zhong  于2019年12月23日周一 下午9:37写道:

> HI,大家好。
>
> 咨询一个问题,flink-training-exercises练习的工程里面
> com.ververica.flinktraining.solutions.datastream_java.cep.LongRidesSolution
>
> Pattern completedRides =
> Pattern.begin("start")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return ride.isStart;
> }
> })
> .next("end")
> .where(new SimpleCondition() {
> @Override
> public boolean filter(TaxiRide ride) throws Exception {
> return !ride.isStart;
> }
> });
>
> 现在有一个类似的监控场景,也是需要超时后输出没有匹配到的数据,但是流的数据有可能产生乱序。
> 是不是就不能匹配例子中的Pattern?
> 如果我想乱序的数据也要匹配上,不作为超时输出有什么对应的解决方案吗?
>


Re: Flink task node shut it self off.

2019-12-20 Thread jingjing bai
hi john

in our experience , the checkpoint interval we set interval 1-10 minute and
timeout usurally   5*interval . mostly we set 2 or 5 minute and 10 or
20timeout.
it depend on u data bulk per second and which window used.

John Smith  于2019年12月21日周六 上午5:26写道:

> Hi, using Flink 1.8.0
>
> 1st off I must say Flink resiliency is very impressive, we lost a node and
> never lost one message by using checkpoints and Kafka. Thanks!
>
> The cluster is a self hosted cluster and we use our own zookeeper cluster.
> We have...
> 3 zookeepers: 4 cpu, 8GB (each)
> 3 job nodes: 4 cpu, 8GB (each)
> 3 task nodes: 4 cpu, 8GB (each)
> The nodes also share GlusterFS for storing savepoints and checkpoints,
> GlusterFS is running on the same machines.
>
> Yesterday a node shut itself off we the following log messages...
> - Stopping TaskExecutor akka.tcp://fl...@xxx.xxx.xxx.73
> :34697/user/taskmanager_0.
> - Stop job leader service.
> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> - Shutting down TaskExecutorLocalStateStoresManager.
> - Shutting down BLOB cache
> - Shutting down BLOB cache
> - removed file cache directory
> /tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
> - I/O manager removed spill file directory
> /tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
> - Shutting down the network environment and its components.
>
> Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU
> load 1minute of 15. And we also got an hs_err file which sais we should
> increase the memory.
>
> I'm attaching the logs here:
> https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0
>
> I wonder if my 5 second checkpointing is too much for gluster.
>
> Any thoughts?
>
>
>
>
>


Re: flink如何动态修改窗口大小和类型?

2019-12-18 Thread jingjing bai
目前一个任务中,仅支持一种窗口。
动态修改本身应该是一个伪需求
如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。


LakeShen  于2019年12月18日周三 下午2:12写道:

> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


Re: FlinkSQL中关于TIMESTAMPDIFF函数官网E.g的疑问

2019-12-16 Thread jingjing bai
不知道你的版本是什么,
在1.9中, 申明为TIMESTAMP类型的属性,需要是 格式化为-MM-DD'T'HH:mm:ss.SSS'Z'
不过你可以从外部传入13位时间戳,也可以转换成TIMESTAMP,比如DDL中定义

CREATE TABLE `t` (
   ctm TIMESTAMP,
) WITH (
  'format.schema' = 'ROW'
)"
。如果数据源也要定义为TIMESTAMP类型,则通过下面方式去定义外部数据源格式:

DateTimeFormatter t = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral('T')
.append(new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.appendPattern("'Z'")
.toFormatter())
.toFormatter();

用这个结构去格式化时间类型


Zhenghua Gao  于2019年12月16日周一 下午7:42写道:

> 1) ML里直接发截图无法展示,可以用第三方图床,然后链接过来。
> 2) 请确认 time1/time2 类型是否是 TIMESTAMP
> 3) 文档中的 TIMESTAMP '2003-01-02 10:00:00' 代表标准SQL的时间常量(timestamp literal),你的
> TIMESTAMP time1 无法被视作时间常量。
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Mon, Dec 16, 2019 at 3:51 PM 1530130567 <1530130...@qq.com> wrote:
>
> > 各位大佬好:
> >由于业务处理逻辑需要计算两个日期的时间差,按照惯例打开官网,查看buildin Functions,ctrC, ctrV,跑起来没问题!
> >
> >   然后我就改了一下:
> >   TIMESTAMPDIFF(DAY, TIMESTAMP time1,TIMESTAMP time2)
> >   SQL validate报错!
> >   然后我又改了一下:
> >   TIMESTAMPDIFF(DAY, cast(time1 as timestamp),cast(time2 as timestamp))
> >   通过了!
> >
> >   我有点疑惑,TIMESTAMP是不是只适用于固定的字符串,而不能用列名这种变量?
> >
> >
>