Hi, Do I understand correctly, that long checkpointing times are caused by slow queries to the database? If so, async querying might resolve the issue on Flink side, but the unnecessary load on DB will remain.
Instead, maybe you can use CDC to stream DB changes and send messages to RabbitMQ when necessary [1][2]? Another option is to implement a custom JDBC writing Function (using JdbcOutputFormat) that would send a message once the relevant write succeeds (it doesn't have to be a sink function). This can be achieved by overriding JdbcOutputFormat.attemptFlush method [3] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/ [2] https://github.com/ververica/flink-cdc-connectors [3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java//org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.html#attemptFlush-- Regards, Roman On Mon, Sep 27, 2021 at 7:44 AM Marco Villalobos <mvillalo...@kineteque.com> wrote: > > > In my Flink Job, I am using event time to process time-series data. > > Due to our business requirements, I need to verify that a specific subset of > data written to a JDBC sink has been written before I send an activemq > message to another component. > > My job flows like this: > > 1. Kafka Source > 2. Split source message by flat map > 3. Aggregate messages in a 15 minute window. (Keyed by rounding timestamp up > to nearest quarter, and name, note that there are 120,000 names. > 5. Insert, forward-fill, or back-fill time-series data (keyed by name, again > there are 120,000 names). Forward fills are done through an event time > timer. Collect range of data processed in a side output. > 6. In a window function determine when time-series (rounded up to nearest > quarter) aligned to the same quarter. > 7. Verify that a subset of aligned time series are already written to the > database (keyed by name, and there are 120,000 of them) and collect an active > mq message when that happens. > > I could not find a good way to verify data was written to the database, so I > introduced a Keyed Process Function with a timer that creates a JDBC > connection and then polls the database to verify it has been written. If the > first attempt fails, it then uses a processing time timer to check a minute > later. Please keep in mind that there are 120000 keys, but only about 1000 > records need this database verification. > > This approach caused checkpoint times to take 2-4 hours. Previously, > checkpoint times were only a few seconds. > > I am experimenting with using an RichAsyncFunction, and the R2DBC Postgres > async driver instead. My R2DBC async code also has a 1 minute timer in it. > So far this async approach fails. > > I feel as though I am at a crossroads. These are my options: > > 1. Continue to tune checkpoints to work with my blocking JDBC calls used in a > Keyed Process function that polls for database writes. > > Or > > 2. Experiment more writing code that uses a rich async function that does the > verification. > > 3. Write the data that needs to be verified to another kafka queue and have > another Flink Job do the verification. It would only need to do that with > 1000 records approximately every 15 minutes. > > Does anybody else have other ideas I can use to solve this?