Following is the code for the policy. It has a fair bit of code to handle idle partition scenarios, for our current discussion let's assume none of the partitions of a given job are idle but all are high throughput partitions. For such scenarios very simply put, getTimestampForRecord() is storing the event_time in the global class variable (latestTimestampSeen) and getWatermark is emitting the last stored latestTimestampSeen. event_time is read from the kafka headers sent by the producer service.
// --- State per partition instance of this policy --- /** Latest event-time timestamp observed (from header or record). */ private Instant latestTimestampSeen = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Last watermark emitted (must be monotonic). Starts at MIN to avoid startup drops. */ private Instant lastEmittedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; /** Wall-clock instant when this policy instance was created (partition assigned). */ private final Instant createdAt = Instant.now(); /** First time we actually saw any record for this partition. */ private @Nullable Instant firstRecordAt = null; /** Last time we observed a record arrive (runner’s wall-clock via PartitionContext). */ private @Nullable Instant lastRecordArrivalAt = null; public KafkaHeaderTimestampPolicy( Optional<Instant> previousWatermark, Duration allowedLateness) { // Seed from previous WM (e.g., after checkpoint restore). Otherwise stay at MIN to avoid // falsely dropping data at startup due to idle partitions. if (previousWatermark != null && previousWatermark.isPresent()) { this.lastEmittedWatermark = previousWatermark.get(); } } @Override public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K, V> record) { // Read event_time from kafka headers Instant ts = headerOrRecordTimestamp(record); if (ts.isAfter(latestTimestampSeen)) { latestTimestampSeen = ts; } if (firstRecordAt == null) { firstRecordAt = ctx.getBacklogCheckTime(); } lastRecordArrivalAt = ctx.getBacklogCheckTime(); return ts; } @Override public Instant getWatermark(PartitionContext ctx) { final Instant nowish = ctx.getBacklogCheckTime(); // runner-supplied wall clock Instant candidate; // Case A: this partition has NEVER produced a record yet. if (firstRecordAt == null) { // During cold-start grace, keep this partition from advancing the watermark at all. // COLD_START_GRACE is set to allowedLateness if (nowish.isBefore(createdAt.plus(COLD_START_GRACE))) { return emitMonotonic(lastEmittedWatermark); // likely MIN (or restored WM) } // After grace, treat as "forever idle" but DON'T jump to MAX. // Advance to (now - IDLE_ADVANCE_GRACE), so global min can progress and // late starts within grace still won't be dropped. // IDLE_ADVANCE_GRACE is set to 2 X allowedLateness Instant bound = nowish.minus(IDLE_ADVANCE_GRACE); return emitMonotonic(bound); } // Case B: we have produced at least once; normal watermark follows last seen event time. candidate = latestTimestampSeen; // If idle for long enough, allow the watermark to float forward with wall clock // but keep a safety margin so we don't get ahead of potential stragglers. if (lastRecordArrivalAt != null) { // some more code to make sure the if partition has been idle for a period of x mins, // partition watermark floats around nowish - IDLE_ADVANCE_GRACE } return emitMonotonic(candidate); } private Instant emitMonotonic(Instant candidate) { if (candidate.isAfter(lastEmittedWatermark)) { lastEmittedWatermark = candidate; } return lastEmittedWatermark; } On Sun, Aug 24, 2025 at 12:53 PM Reuven Lax via user <user@beam.apache.org> wrote: > Can you explain more about your custom watermark policy? > > On Sun, Aug 24, 2025 at 11:43 AM gaurav mishra < > gauravmishra.it...@gmail.com> wrote: > >> Hi All, >> We have a dataflow job which is consuming from 100s of kafka topics. This >> job has time based window aggregations. From time to time we see the jobs >> dropping data due to lateness at pipeline start. once the job reaches a >> steady state, there is no data loss. We are using a custom timestamp policy >> which is moving the watermark based on the message's header/payload to stay >> in the event_time domain. >> For the data loss following is my theory which i need your input if thats >> a valid possibility and if possible are there any solutions to this problem >> - >> >> say there are n topics - t1, t2..... t_withlag....tn which are being >> consumed from. >> all topics have almost 0 backlog except for one t_withlag. >> The backlog can be assumed to be substantial so that it goes beyond the >> allowed lateness. >> >> Job starts - >> 1) Budle1 starts getting created and let's say the bundle was filled only >> from the first few topics. so that runner could not reach the laggy topic. >> 2) watermark policy is invoked for all topics' partitions which >> participated in the above bundle. >> 3) Runner computed the global watermark based on the above input. >> 4) Since the laggy topic did not contribute to the watermark calculation >> the global watermark was set to ~now() >> 5) Next bundle starts processing and in this bundle runner started >> encountering msges from t_withlag and hence dropping data from those >> partitions until it is caught up. >> >> Is this scenario possible? If yes, what are possible solutions to such a >> situation? >> >> >> >> >> >> >> >> >>