Of the supported Beam languages the one I know the best is Go so that's what I've been working in. What I ended up doing was forking the databaseio code and changing it so that any rows that had an error on a write are emitted as a pcollection, and the error in question is also emitted as a pcollection. I can then redirect the failed rows to a dead letter queue. I _think_ it's working (at least it appears to locally and for small amounts of data in Dataflow).
You are definitely right that there will likely be some trickiness when replaying the dead letter queue. Obviously I'm hopeful that these kinds of errors will be very infrequent. On Thu, Apr 3, 2025 at 1:35 AM Radek Stankiewicz <radosl...@google.com> wrote: > hi Jonathan, > Is there a specific IO in mind you would like to use? If I think about > enforcing key constraints, then I think about sql databases like pgsql, > mysql, alloydb. In Beam those databases are supported in JDBCIO. > Problem with JDBCIO is that it doesn't yet have an error handler - > https://github.com/apache/beam/issues/20920. > So temporarily I would recommend writing to the database from within parDo > and handling errors like you've described. > > If your duplicates can happen by the same pipeline you could preprocess > the data and partition (e.g. group by key) the data based on the > identifier. This way the worker that is processing one identifier will > process all the others. > the tricky parts are when you need to solve 'at least once processing' as > each of your entries may be reprocessed for some reason. In such a case, > when inserting you should probably do upsert or check at least if id also > matches and treat it as success if same (id, ident) was already inserted. > > On Tue, Apr 1, 2025 at 8:12 PM Jonathan Hope < > jonathan.douglas.h...@gmail.com> wrote: > >> Hello, I had a question and was hoping this was the right place to ask. >> >> Let's say I'm moving data from a NoSQL database to a SQL database. Here >> is an example document in the NoSQL database: >> >> { >> "id": "1234", >> "identifier": "5678" >> } >> >> The id is system generated, and the identifier is user provided. This is >> being moved into a SQL database with two columns: >> >> - id >> - identifier >> >> In the SQL database there is a UNIQUE index on identifier, however the >> same thing cannot be enforced on the NoSQL side. Now I could check for this >> like so: >> >> 1. Get source data >> 2. Check to see if identifier has already been inserted >> 3. Move duplicates to a dead letter queue >> 4. Write the data >> 5. Success >> >> But what could happen is: >> >> 1. Get source data >> 2. Check to see if identifier has already been inserted >> 3. Move duplicates to a dead letter queue >> 4. Another worker inserts a duplicate identifier >> 5. Write the data >> 6. Failure >> >> >> If I was doing this outside of the beam context I would try the write, >> capture the errors, and then redirect the failures to some kind of dead >> letter queue. However for the life of me I can't figure out how to do this >> in Beam. In cases where writes are failing, retries will never succeed, and >> you can't reliably check for the trigger of the failure ahead of time what >> is the recommended pattern? >> >> Thanks! >> >