hi Jonathan,
Is there a specific IO in mind you would like to use? If I think about
enforcing key constraints, then I think about  sql databases like pgsql,
mysql, alloydb. In Beam those databases are supported in JDBCIO.
Problem with JDBCIO is that it doesn't yet have an error handler -
https://github.com/apache/beam/issues/20920.
So temporarily I would recommend writing to the database from within parDo
and handling errors like you've described.

If your duplicates can happen by the same pipeline you could preprocess the
data and partition (e.g. group by key) the data based on the identifier.
This way the worker that is processing one identifier will process all the
others.
the tricky parts are when you need to solve 'at least once processing' as
each of your entries may be reprocessed for some reason. In such a case,
when inserting you should probably do upsert or check at least if id also
matches and treat it as success if same (id, ident) was already inserted.

On Tue, Apr 1, 2025 at 8:12 PM Jonathan Hope <
jonathan.douglas.h...@gmail.com> wrote:

> Hello, I had a question and was hoping this was the right place to ask.
>
> Let's say I'm moving data from a NoSQL database to a SQL database. Here is
> an example document in the NoSQL database:
>
> {
>  "id": "1234",
>   "identifier": "5678"
> }
>
> The id is system generated, and the identifier is user provided. This is
> being moved into a SQL database with two columns:
>
>    - id
>    - identifier
>
> In the SQL database there is a UNIQUE index on identifier, however the
> same thing cannot be enforced on the NoSQL side. Now I could check for this
> like so:
>
>    1. Get source data
>    2. Check to see if identifier has already been inserted
>    3. Move duplicates to a dead letter queue
>    4. Write the data
>    5. Success
>
> But what could happen is:
>
>    1. Get source data
>    2. Check to see if identifier has already been inserted
>    3. Move duplicates to a dead letter queue
>    4. Another worker inserts a duplicate identifier
>    5. Write the data
>    6. Failure
>
>
> If I was doing this outside of the beam context I would try the write,
> capture the errors, and then redirect the failures to some kind of dead
> letter queue. However for the life of me I can't figure out how to do this
> in Beam. In cases where writes are failing, retries will never succeed, and
> you can't reliably check for the trigger of the failure ahead of time what
> is the recommended pattern?
>
> Thanks!
>

Reply via email to