Following is the code for the policy. It has a fair bit of code to handle
idle partition scenarios, for our current discussion let's assume none of
the partitions of a given job are idle but all are high throughput
partitions. For such scenarios very simply put, getTimestampForRecord() is
storing the event_time in the global class variable
(latestTimestampSeen) and
getWatermark is emitting the last stored latestTimestampSeen. event_time is
read from the kafka headers sent by the producer service.
// --- State per partition instance of this policy ---
/** Latest event-time timestamp observed (from header or record). */
private Instant latestTimestampSeen = BoundedWindow.TIMESTAMP_MIN_VALUE;
/** Last watermark emitted (must be monotonic). Starts at MIN to avoid
startup drops. */
private Instant lastEmittedWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
/** Wall-clock instant when this policy instance was created (partition
assigned). */
private final Instant createdAt = Instant.now();
/** First time we actually saw any record for this partition. */
private @Nullable Instant firstRecordAt = null;
/** Last time we observed a record arrive (runner’s wall-clock via
PartitionContext). */
private @Nullable Instant lastRecordArrivalAt = null;
public KafkaHeaderTimestampPolicy(
Optional<Instant> previousWatermark, Duration allowedLateness) {
// Seed from previous WM (e.g., after checkpoint restore). Otherwise
stay at MIN to avoid
// falsely dropping data at startup due to idle partitions.
if (previousWatermark != null && previousWatermark.isPresent()) {
this.lastEmittedWatermark = previousWatermark.get();
}
}
@Override
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<K,
V> record) {
// Read event_time from kafka headers
Instant ts = headerOrRecordTimestamp(record);
if (ts.isAfter(latestTimestampSeen)) {
latestTimestampSeen = ts;
}
if (firstRecordAt == null) {
firstRecordAt = ctx.getBacklogCheckTime();
}
lastRecordArrivalAt = ctx.getBacklogCheckTime();
return ts;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
final Instant nowish = ctx.getBacklogCheckTime(); // runner-supplied
wall clock
Instant candidate;
// Case A: this partition has NEVER produced a record yet.
if (firstRecordAt == null) {
// During cold-start grace, keep this partition from advancing the
watermark at all.
// COLD_START_GRACE is set to allowedLateness
if (nowish.isBefore(createdAt.plus(COLD_START_GRACE))) {
return emitMonotonic(lastEmittedWatermark); // likely MIN (or
restored WM)
}
// After grace, treat as "forever idle" but DON'T jump to MAX.
// Advance to (now - IDLE_ADVANCE_GRACE), so global min can progress
and
// late starts within grace still won't be dropped.
// IDLE_ADVANCE_GRACE is set to 2 X allowedLateness
Instant bound = nowish.minus(IDLE_ADVANCE_GRACE);
return emitMonotonic(bound);
}
// Case B: we have produced at least once; normal watermark follows
last seen event time.
candidate = latestTimestampSeen;
// If idle for long enough, allow the watermark to float forward with
wall clock
// but keep a safety margin so we don't get ahead of potential
stragglers.
if (lastRecordArrivalAt != null) {
// some more code to make sure the if partition has been idle for a
period of x mins,
// partition watermark floats around nowish - IDLE_ADVANCE_GRACE
}
return emitMonotonic(candidate);
}
private Instant emitMonotonic(Instant candidate) {
if (candidate.isAfter(lastEmittedWatermark)) {
lastEmittedWatermark = candidate;
}
return lastEmittedWatermark;
}
On Sun, Aug 24, 2025 at 12:53 PM Reuven Lax via user <[email protected]>
wrote:
> Can you explain more about your custom watermark policy?
>
> On Sun, Aug 24, 2025 at 11:43 AM gaurav mishra <
> [email protected]> wrote:
>
>> Hi All,
>> We have a dataflow job which is consuming from 100s of kafka topics. This
>> job has time based window aggregations. From time to time we see the jobs
>> dropping data due to lateness at pipeline start. once the job reaches a
>> steady state, there is no data loss. We are using a custom timestamp policy
>> which is moving the watermark based on the message's header/payload to stay
>> in the event_time domain.
>> For the data loss following is my theory which i need your input if thats
>> a valid possibility and if possible are there any solutions to this problem
>> -
>>
>> say there are n topics - t1, t2..... t_withlag....tn which are being
>> consumed from.
>> all topics have almost 0 backlog except for one t_withlag.
>> The backlog can be assumed to be substantial so that it goes beyond the
>> allowed lateness.
>>
>> Job starts -
>> 1) Budle1 starts getting created and let's say the bundle was filled only
>> from the first few topics. so that runner could not reach the laggy topic.
>> 2) watermark policy is invoked for all topics' partitions which
>> participated in the above bundle.
>> 3) Runner computed the global watermark based on the above input.
>> 4) Since the laggy topic did not contribute to the watermark calculation
>> the global watermark was set to ~now()
>> 5) Next bundle starts processing and in this bundle runner started
>> encountering msges from t_withlag and hence dropping data from those
>> partitions until it is caught up.
>>
>> Is this scenario possible? If yes, what are possible solutions to such a
>> situation?
>>
>>
>>
>>
>>
>>
>>
>>
>>