Re: context.timestamp null in keyedprocess function
hi. Could you share more info for us, e.g. exception stack? Do you set the assigner for all the source? I think you can modify the KeyedProcessFuncition to print the message whose timestamp is null. Best, Shengkai bat man 于2022年6月15日周三 14:57写道: > Has anyone experienced this or has any clue? > > On Tue, Jun 14, 2022 at 6:21 PM bat man wrote: > >> Hi, >> >> We are using flink 12.1 on AWS EMR. The job reads the event stream and >> enrich stream from another topic. >> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract >> timestamp from the event and handle idle source partitions. >> AutoWatermarkInterval set to 5000L. >> The timestamp extractor looks like below - >> >> @Override >> public long extractTimestamp(Raw event, long >> previousElementTimestamp) { >> lastRecordProcessingTime = System.currentTimeMillis(); >> Double eventTime = >> >> Double.parseDouble(event.getTimestamp().toString()).longValue(); >> long timestamp = Instant.ofEpochMilli(eventTime >> *1_000).toEpochMilli(); >> if (timestamp > currentMaxTimestamp) { >> currentMaxTimestamp = timestamp; >> } >> return timestamp; >> } >> >> Second step the rules are joined to events, this is done in keyedprocess >> function. >> What we have observed is that at times when the job starts consuming from >> the beginning of the event source stream, the timestamp accessed in >> the keyedprocess fn using context.timestamp comes as null and the code is >> throwing NPE. >> This happens only for some records intermittently and the same event when >> we try to process in another environment it processes fine, that means the >> event is getting parsed fine. >> >> What could be the issue, anyone has any idea, because as far as timestamp >> goes it could only be null if the timestamp extractor sends null. >> >> Thanks. >> >
Re: context.timestamp null in keyedprocess function
Has anyone experienced this or has any clue? On Tue, Jun 14, 2022 at 6:21 PM bat man wrote: > Hi, > > We are using flink 12.1 on AWS EMR. The job reads the event stream and > enrich stream from another topic. > We extend AssignerWithPeriodicWatermarks to assign watermarks and extract > timestamp from the event and handle idle source partitions. > AutoWatermarkInterval set to 5000L. > The timestamp extractor looks like below - > > @Override > public long extractTimestamp(Raw event, long > previousElementTimestamp) { > lastRecordProcessingTime = System.currentTimeMillis(); > Double eventTime = > > Double.parseDouble(event.getTimestamp().toString()).longValue(); > long timestamp = Instant.ofEpochMilli(eventTime > *1_000).toEpochMilli(); > if (timestamp > currentMaxTimestamp) { > currentMaxTimestamp = timestamp; > } > return timestamp; > } > > Second step the rules are joined to events, this is done in keyedprocess > function. > What we have observed is that at times when the job starts consuming from > the beginning of the event source stream, the timestamp accessed in > the keyedprocess fn using context.timestamp comes as null and the code is > throwing NPE. > This happens only for some records intermittently and the same event when > we try to process in another environment it processes fine, that means the > event is getting parsed fine. > > What could be the issue, anyone has any idea, because as far as timestamp > goes it could only be null if the timestamp extractor sends null. > > Thanks. >
context.timestamp null in keyedprocess function
Hi, We are using flink 12.1 on AWS EMR. The job reads the event stream and enrich stream from another topic. We extend AssignerWithPeriodicWatermarks to assign watermarks and extract timestamp from the event and handle idle source partitions. AutoWatermarkInterval set to 5000L. The timestamp extractor looks like below - @Override public long extractTimestamp(Raw event, long previousElementTimestamp) { lastRecordProcessingTime = System.currentTimeMillis(); Double eventTime = Double.parseDouble(event.getTimestamp().toString()).longValue(); long timestamp = Instant.ofEpochMilli(eventTime *1_000).toEpochMilli(); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } Second step the rules are joined to events, this is done in keyedprocess function. What we have observed is that at times when the job starts consuming from the beginning of the event source stream, the timestamp accessed in the keyedprocess fn using context.timestamp comes as null and the code is throwing NPE. This happens only for some records intermittently and the same event when we try to process in another environment it processes fine, that means the event is getting parsed fine. What could be the issue, anyone has any idea, because as far as timestamp goes it could only be null if the timestamp extractor sends null. Thanks.