Hi Cristian,

I think that the backlog is not going to be 100% reliable for this use-case. A more robust approach would probably be to fetch the endOffsets from Kafka when submitting the job (or in appropriate time, depending on how do you get updates to the state topic, to make sure that you cannot miss something) and then compare these with offset in KafkaRecord in getTimestampForRecord. After all partitions reach the end offset, that should be the point when you can advance the watermark to infinity.

 Jan

On 9/10/21 9:14 AM, Cristian Constantinescu wrote:
Hi Jan and Luke,

Sorry for the late reply.

@Jan
I ended up implementing a timestamp policy like below

   private class AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy extends TimestampPolicy<String, GenericRecord> {
        protected Instant currentWatermark;

        public AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy(Optional<Instant> previousWatermark) {             currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<String, GenericRecord> record) {
            currentWatermark = new Instant(record.getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            if (ctx.getMessageBacklog() == 0) {
                // The reader is caught up. May need to advance the watermark.
                return BoundedWindow.TIMESTAMP_MAX_VALUE;

            } // else, there is backlog (or is unknown). Do not advance the watermark.
            return currentWatermark;
        }
    }

This seems to work most of the time, but sometimes when using exactly once semantics with KafkaIO and Flink ctx.getMessageBacklog() is always > 0, so the watermark is not advanced to BoundedWindow.TIMESTAMP_MAX_VALUE.

I'll have to find a reliable way to reproduce that though and make a sample project.

@Luke
That could be a solution, however I prefer to keep state into kafka instead of hashing because there are cases where the id of (A1,B1,C1) is the same as the id for (A2, B1, C1) because A1 became A2 after some time (like a person legally changed their name, or a financial security changed its symbol or cusip, etc).

Thank you both for your suggestions and guidance.

Cheers,
Cristian

On Mon, Aug 9, 2021 at 6:32 PM Luke Cwik <[email protected] <mailto:[email protected]>> wrote:

    You could look at using a cryptographic hashing function such as
    sha512 [1].

    You would take the record (A1, B1, C1) encode it and pass it to
    the hashing function to generate a binary hash which you could
    then convert back to string via an encoding such as hex. The odds
    of getting a collision are astronomically small (like you are more
    likely to have a random bit flip due to a cosmic ray than for a
    collision to happen).

    This way you would never need to restore the ids from a previous
    run by looking them up from an external source.

    1:
    
https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java
    
<https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java>

    On Thu, Jul 22, 2021 at 7:40 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Cristian,

        I didn't try that, so I'm not 100% sure it would work, but you
        probably
        could try using custom timestamp policy for the KafkaIO, which
        will
        shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once
        you know
        you reached head of the state topic. That would probably
        require reading
        the end offsets before running the Pipeline. This should turn
        the source
        into bounded source effectively.

          Jan

        [1]
        
https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
        
<https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->

        On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
        > Hi All,
        >
        > I would like to know if there's a suggested pattern for the
        below
        > scenario. TL;DR: reading state from Kafka.
        >
        > I have a scenario where I'm listening to a kafka topic and
        generate a
        > unique id based on the properties of the incoming item.
        Then, I output
        > the result to another kafka topic. The tricky part is that
        when the
        > pipeline is restarted, I have to read the output topic and
        build up
        > the ids state, this way if I see an item that was already
        given an id,
        > I give the same id back and do not generate a new one.
        >
        > For example:
        > Input topic -> Output topic
        > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
        > (A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
        > pipeline is restarted
        > (A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
        > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <--
        because we've
        > already seen (A1, B1, C1) before
        >
        > I can't really use any type of windows except the global
        ones, as I
        > need to join on all the items of the output topic (the one
        with the
        > already generated ids).
        >
        > Right now, I flatten both input and output topics and I use
        a trigger
        > on the global window
        >
        
AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10)

        > then group by properties (A,B,C). Once that is done, I look
        through
        > the grouped rows and see if any one of them has an id already
        > generated. If yes, all the other rows get this id and the id
        is saved
        > in the ParDo's state for the future messages. If no, then
        generate a
        > new id.
        >
        > My solution seems to work. Kind of...
        >
        > This puts a delay of 10s on all the incoming messages. I'd
        prefer it
        > wouldn't be the case. I would like to read the output topic
        at the
        > start of the pipeline, build the state, then start
        processing the
        > input topic. Since the output topic will be stale until I start
        > processing the input topic again, it effectively is a
        > bounded collection. Unfortunately because it's kafkaIO, it's
        still
        > considered an unbounded source, which mainly means that
        Wait.on() this
        > collection waits forever. (Note: I've read the notes in the
        > documentation [1] but either do not understand them or
        didn't take the
        > appropriate steps for wait.on to trigger properly.)
        >
        > I have also tried to window the output topic in a session
        window with
        > a one second gap. Basically, if I don't get any item for 1
        second, it
        > means that I finished reading the output topic and can start
        > processing the input topic. Unfortunately Wait.on() doesn't
        work for
        > Session Windows.
        >
        > Furthermore, I don't think side inputs work for this
        problem. First
        > because I'm not sure how to create the side input from an
        unbounded
        > source. Second because the side input needs to be updated
        when a new
        > id is generated.
        >
        > I would appreciate any thoughts or ideas to elegantly solve
        this problem.
        >
        > Thanks,
        > Cristian
        >
        > [1]
        >
        
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
        
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>

        >
        
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
        
<https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>>

Reply via email to