Hi,
可以看下事件时间戳的生成,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html
下面例子里时间戳都是来自element里面的时间字段。还有一个AscendingTimestampExtractor。
/**
 * This generator generates watermarks assuming that elements arrive out of 
order,
 * but only to a certain degree. The latest elements for a certain timestamp t 
will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 
*/publicclassBoundedOutOfOrdernessGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxOutOfOrderness=3500;//
 3.5 
secondsprivatelongcurrentMaxTimestamp;@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){longtimestamp=element.getCreationTime();currentMaxTimestamp=Math.max(timestamp,currentMaxTimestamp);returntimestamp;}@OverridepublicWatermarkgetCurrentWatermark(){//
 return the watermark as current highest timestamp minus the out-of-orderness 
boundreturnnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}/**
 * This generator generates watermarks that are lagging behind processing time 
by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 
*/publicclassTimeLagWatermarkGeneratorimplementsAssignerWithPeriodicWatermarks<MyEvent>{privatefinallongmaxTimeLag=5000;//
 5 
seconds@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkgetCurrentWatermark(){//
 return the watermark as current time minus the maximum time 
lagreturnnewWatermark(System.currentTimeMillis()-maxTimeLag);}}
publicclassPunctuatedAssignerimplementsAssignerWithPunctuatedWatermarks<MyEvent>{@OverridepubliclongextractTimestamp(MyEventelement,longpreviousElementTimestamp){returnelement.getCreationTime();}@OverridepublicWatermarkcheckAndGetNextWatermark(MyEventlastElement,longextractedTimestamp){returnlastElement.hasWatermarkMarker()?newWatermark(extractedTimestamp):null;}}
希望能有所帮助。


DataStream和kafkaSource后面都可以调用assignTimestampsAndWatermarks。


kafkaSource.assignTimestampsAndWatermarks(newAscendingTimestampExtractor<MyType>(){@OverridepubliclongextractAscendingTimestamp(MyTypeelement){returnelement.eventTimestamp();}});







在 2019-08-07 15:47:41,"xiaohei.info" <xiaohei.i...@gmail.com> 写道:
>
>hi,all:
>  event time这个时间戳是在什么时候打到数据上面去的,看api是在flink 
> source收到数据之后再标注的,并不是真正的数据源携带过来的(比如手机终端)?使用kafka 
> source的话根据文档的定义kafka携带的时间戳也仅仅是kafka收到数据的时候打上的时间戳。
>  
> 那么有个问题:以kafka为例,数据到队列的时候按「顺序」打上时间戳,那么如果数据是「乱序到达」的也被打上了「递增的时间戳」,后续基于event-time的处理都是基于这个时间戳来进行,那不就丧失了真实世界的定义吗?
>  不知道有哪里是我理解不对的地方望指教!
>  祝好~

回复