Hi Jan and Luke,
Sorry for the late reply.
@Jan
I ended up implementing a timestamp policy like below
private class AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy
extends TimestampPolicy<String, GenericRecord> {
protected Instant currentWatermark;
public
AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy(Optional<Instant>
previousWatermark) {
currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant
getTimestampForRecord(TimestampPolicy.PartitionContext ctx,
KafkaRecord<String, GenericRecord> record) {
currentWatermark = new Instant(record.getTimestamp());
return currentWatermark;
}
@Override
public Instant getWatermark(PartitionContext ctx) {
if (ctx.getMessageBacklog() == 0) {
// The reader is caught up. May need to advance the
watermark.
return BoundedWindow.TIMESTAMP_MAX_VALUE;
} // else, there is backlog (or is unknown). Do not
advance the watermark.
return currentWatermark;
}
}
This seems to work most of the time, but sometimes when using exactly
once semantics with KafkaIO and Flink ctx.getMessageBacklog() is
always > 0, so the watermark is not advanced to
BoundedWindow.TIMESTAMP_MAX_VALUE.
I'll have to find a reliable way to reproduce that though and make a
sample project.
@Luke
That could be a solution, however I prefer to keep state into kafka
instead of hashing because there are cases where the id of (A1,B1,C1)
is the same as the id for (A2, B1, C1) because A1 became A2 after some
time (like a person legally changed their name, or a financial
security changed its symbol or cusip, etc).
Thank you both for your suggestions and guidance.
Cheers,
Cristian
On Mon, Aug 9, 2021 at 6:32 PM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
You could look at using a cryptographic hashing function such as
sha512 [1].
You would take the record (A1, B1, C1) encode it and pass it to
the hashing function to generate a binary hash which you could
then convert back to string via an encoding such as hex. The odds
of getting a collision are astronomically small (like you are more
likely to have a random bit flip due to a cosmic ray than for a
collision to happen).
This way you would never need to restore the ids from a previous
run by looking them up from an external source.
1:
https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java
<https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java>
On Thu, Jul 22, 2021 at 7:40 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Cristian,
I didn't try that, so I'm not 100% sure it would work, but you
probably
could try using custom timestamp policy for the KafkaIO, which
will
shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once
you know
you reached head of the state topic. That would probably
require reading
the end offsets before running the Pipeline. This should turn
the source
into bounded source effectively.
Jan
[1]
https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
<https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
> Hi All,
>
> I would like to know if there's a suggested pattern for the
below
> scenario. TL;DR: reading state from Kafka.
>
> I have a scenario where I'm listening to a kafka topic and
generate a
> unique id based on the properties of the incoming item.
Then, I output
> the result to another kafka topic. The tricky part is that
when the
> pipeline is restarted, I have to read the output topic and
build up
> the ids state, this way if I see an item that was already
given an id,
> I give the same id back and do not generate a new one.
>
> For example:
> Input topic -> Output topic
> (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
> (A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
> pipeline is restarted
> (A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
> (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <--
because we've
> already seen (A1, B1, C1) before
>
> I can't really use any type of windows except the global
ones, as I
> need to join on all the items of the output topic (the one
with the
> already generated ids).
>
> Right now, I flatten both input and output topics and I use
a trigger
> on the global window
>
AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10)
> then group by properties (A,B,C). Once that is done, I look
through
> the grouped rows and see if any one of them has an id already
> generated. If yes, all the other rows get this id and the id
is saved
> in the ParDo's state for the future messages. If no, then
generate a
> new id.
>
> My solution seems to work. Kind of...
>
> This puts a delay of 10s on all the incoming messages. I'd
prefer it
> wouldn't be the case. I would like to read the output topic
at the
> start of the pipeline, build the state, then start
processing the
> input topic. Since the output topic will be stale until I start
> processing the input topic again, it effectively is a
> bounded collection. Unfortunately because it's kafkaIO, it's
still
> considered an unbounded source, which mainly means that
Wait.on() this
> collection waits forever. (Note: I've read the notes in the
> documentation [1] but either do not understand them or
didn't take the
> appropriate steps for wait.on to trigger properly.)
>
> I have also tried to window the output topic in a session
window with
> a one second gap. Basically, if I don't get any item for 1
second, it
> means that I finished reading the output topic and can start
> processing the input topic. Unfortunately Wait.on() doesn't
work for
> Session Windows.
>
> Furthermore, I don't think side inputs work for this
problem. First
> because I'm not sure how to create the side input from an
unbounded
> source. Second because the side input needs to be updated
when a new
> id is generated.
>
> I would appreciate any thoughts or ideas to elegantly solve
this problem.
>
> Thanks,
> Cristian
>
> [1]
>
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>
>
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>>