Following is the code for the policy. It has a fair bit of code to handle
idle partition scenarios, for our current discussion let's assume none of
the partitions of a given job are idle but all are high throughput
partitions. For such scenarios very simply put,  getTimestampForRecord() is
storing the event_time in the global class variable
(latestTimestampSeen)  and
getWatermark is emitting the last stored latestTimestampSeen. event_time is
read from the kafka headers sent by the producer service.


// --- State per partition instance of this policy ---
  /** Latest event-time timestamp observed (from header or record). */
  private Instant latestTimestampSeen = BoundedWindow.TIMESTAMP_MIN_VALUE;

  /** Last watermark emitted (must be monotonic). Starts at MIN to avoid
startup drops. */
  private Instant lastEmittedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

  /** Wall-clock instant when this policy instance was created (partition
assigned). */
  private final Instant createdAt = Instant.now();

  /** First time we actually saw any record for this partition. */
  private @Nullable Instant firstRecordAt = null;

  /** Last time we observed a record arrive (runner’s wall-clock via
PartitionContext). */
  private @Nullable Instant lastRecordArrivalAt = null;




  public KafkaHeaderTimestampPolicy(
          Optional<Instant> previousWatermark, Duration allowedLateness) {

    // Seed from previous WM (e.g., after checkpoint restore). Otherwise
stay at MIN to avoid
    // falsely dropping data at startup due to idle partitions.
    if (previousWatermark != null && previousWatermark.isPresent()) {
      this.lastEmittedWatermark = previousWatermark.get();
    }
  }

  @Override
  public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K,
V> record) {

    // Read event_time from kafka headers
   Instant ts = headerOrRecordTimestamp(record);

    if (ts.isAfter(latestTimestampSeen)) {
      latestTimestampSeen = ts;
    }
    if (firstRecordAt == null) {
      firstRecordAt = ctx.getBacklogCheckTime();
    }
    lastRecordArrivalAt = ctx.getBacklogCheckTime();
    return ts;
  }


  @Override
  public Instant getWatermark(PartitionContext ctx) {
    final Instant nowish = ctx.getBacklogCheckTime(); // runner-supplied
wall clock
    Instant candidate;

    // Case A: this partition has NEVER produced a record yet.
    if (firstRecordAt == null) {
      // During cold-start grace, keep this partition from advancing the
watermark at all.
      // COLD_START_GRACE is set to allowedLateness
      if (nowish.isBefore(createdAt.plus(COLD_START_GRACE))) {
        return emitMonotonic(lastEmittedWatermark); // likely MIN (or
restored WM)
      }
      // After grace, treat as "forever idle" but DON'T jump to MAX.
      // Advance to (now - IDLE_ADVANCE_GRACE), so global min can progress
and
      // late starts within grace still won't be dropped.
      // IDLE_ADVANCE_GRACE is set to 2 X allowedLateness
      Instant bound = nowish.minus(IDLE_ADVANCE_GRACE);
      return emitMonotonic(bound);
    }

    // Case B: we have produced at least once; normal watermark follows
last seen event time.
    candidate = latestTimestampSeen;

    // If idle for long enough, allow the watermark to float forward with
wall clock
    // but keep a safety margin so we don't get ahead of potential
stragglers.
    if (lastRecordArrivalAt != null) {
      // some more code to make sure the if partition has been idle for a
period of x mins,
      //  partition watermark floats around nowish - IDLE_ADVANCE_GRACE

    }

    return emitMonotonic(candidate);
  }


  private Instant emitMonotonic(Instant candidate) {
    if (candidate.isAfter(lastEmittedWatermark)) {
      lastEmittedWatermark = candidate;
    }
    return lastEmittedWatermark;
  }

On Sun, Aug 24, 2025 at 12:53 PM Reuven Lax via user <user@beam.apache.org>
wrote:

> Can you explain more about your custom watermark policy?
>
> On Sun, Aug 24, 2025 at 11:43 AM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> Hi All,
>> We have a dataflow job which is consuming from 100s of kafka topics. This
>> job has time based window aggregations. From time to time we see the jobs
>> dropping data due to lateness at pipeline start. once the job reaches a
>> steady state, there is no data loss. We are using a custom timestamp policy
>> which is moving the watermark based on the message's header/payload to stay
>> in the event_time domain.
>> For the data loss following is my theory which i need your input if thats
>> a valid possibility and if possible are there any solutions to this problem
>> -
>>
>> say there are n topics - t1, t2..... t_withlag....tn which are being
>> consumed from.
>> all topics have almost 0 backlog except for one t_withlag.
>> The backlog can be assumed to be substantial so that it goes beyond the
>> allowed lateness.
>>
>> Job starts -
>> 1) Budle1 starts getting created and let's say the bundle was filled only
>> from the first few topics. so that runner could not reach the laggy topic.
>> 2) watermark policy is invoked for all topics' partitions which
>> participated in the above bundle.
>> 3) Runner computed the global watermark based on the above input.
>> 4) Since the laggy topic did not contribute to the watermark calculation
>> the global watermark was set to ~now()
>> 5) Next bundle starts processing and in this bundle runner started
>> encountering msges from t_withlag and hence dropping data from those
>> partitions until it is caught up.
>>
>> Is this scenario possible? If yes, what are possible solutions to such a
>> situation?
>>
>>
>>
>>
>>
>>
>>
>>
>>

Reply via email to