Hi,

We are using flink 12.1 on AWS EMR. The job reads the event stream and
enrich stream from another topic.
We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
timestamp from the event and handle idle source partitions.
AutoWatermarkInterval set to 5000L.
 The timestamp extractor looks like below -

        @Override
        public long extractTimestamp(Raw event, long
previousElementTimestamp) {
            lastRecordProcessingTime = System.currentTimeMillis();
            Double eventTime =

Double.parseDouble(event.getTimestamp().toString()).longValue();
            long timestamp = Instant.ofEpochMilli(eventTime
*1_000).toEpochMilli();
            if (timestamp > currentMaxTimestamp) {
                currentMaxTimestamp = timestamp;
            }
            return timestamp;
        }

Second step the rules are joined to events, this is done in keyedprocess
function.
What we have observed is that at times when the job starts consuming from
the beginning of the event source stream, the timestamp accessed in
the keyedprocess fn using context.timestamp comes as null and the code is
throwing NPE.
This happens only for some records intermittently and the same event when
we try to process in another environment it processes fine, that means the
event is getting parsed fine.

What could be the issue, anyone has any idea, because as far as timestamp
goes it could only be null if the timestamp extractor sends null.

Thanks.

Reply via email to