Hi Anuj,

What parallelism has your source? Do all of your source tasks produce
records? Watermark is always the minimum of timestamps seen from all the
upstream operators. Therefore if some of them do not produce records the
watermark will not progress. You can read more about Watermarks and how
they work here:
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams

Hope that helps

Best,

Dawid

On 02/03/2020 16:26, aj wrote:
>
> I am trying to use process function to some processing on a set of
> events. I am using event time and keystream. The issue I am facing is
> The watermark value is always coming as 9223372036854725808. I have
> put print statement to debug and it shows like this:
>
> timestamp------1583128014000 extractedTimestamp 1583128014000
> currentwatermark-----9223372036854775808
>
> timestamp------1583128048000 extractedTimestamp 1583128048000
> currentwatermark-----9223372036854775808
>
> timestamp------1583128089000 extractedTimestamp 1583128089000
> currentwatermark-----9223372036854775808
>
> timestamp and extracted timestamp changing but watermark not getting
> updated. So no record is getting in the queue as context.timestamp is
> never less than the watermark.
>
>
> |DataStream<GenericRecord> dataStream =
> env.addSource(searchConsumer).name("search_list_keyless");
> DataStream<GenericRecord> dataStreamWithWaterMark =
> dataStream.assignTimestampsAndWatermarks(new SessionAssigner()); try {
> dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>)
> record -> { StringBuilder builder = new StringBuilder();
> builder.append(record.get("session_id"));
> builder.append(record.get("user_id")); return builder.toString();
> }).process(new MatchFunction()).print(); } catch (Exception e){
> e.printStackTrace(); } env.execute("start session process"); } public
> static class SessionAssigner implements
> AssignerWithPunctuatedWatermarks<GenericRecord> { @Override public
> long extractTimestamp(GenericRecord record, long
> previousElementTimestamp) { long timestamp = (long)
> record.get("event_ts"); System.out.println("timestamp------"+
> timestamp); return timestamp; } @Override public Watermark
> checkAndGetNextWatermark(GenericRecord record, long
> extractedTimestamp) { // simply emit a watermark with every event
> System.out.println("extractedTimestamp "+extractedTimestamp); return
> new Watermark(extractedTimestamp - 30000); } }|
> |
> ||
> |@Override public void processElement(GenericRecord record, Context
> context, Collector<Object> collector) throws Exception { TimerService
> timerService = context.timerService();
> System.out.println("currentwatermark----"+
> timerService.currentWatermark()); if (context.timestamp() >
> timerService.currentWatermark()) { Tuple2<Long,
> PriorityQueue<GenericRecord>> queueval = queueState.value();
> PriorityQueue<GenericRecord> queue = queueval.f1; long startTime =
> queueval.f0; System.out.println("starttime----"+ startTime); if (queue
> == null) { queue = new PriorityQueue<>(10, new TimeStampComprator());
> startTime = (long) record.get("event_ts"); } queueState.update(new
> Tuple2<>(startTime, queue));
> timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000); } } }||
> Please help me to underand what i am doing wrong.
> -- 
> Thanks & Regards,
> Anuj Jain
>
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to