This commit rewrites most of bin/rpki-rtr/rtr-update.c to accomplish
these goals:
 1. Use the schema introduced a few commits ago.
 2. Use libdb (from lib/db/) for database interaction instead of the
    scm library (from lib/rpki).

WARNING: This is one of a series of commits to use the updated schema.
It is not the last in the series, so the code is still broken.

addresses [#7]
---
 bin/rpki-rtr/rtr-update.c | 467 +++++++++++++------------------------
 lib/db/clients/rtr.c      | 570 ++++++++++++++++++++++++++++++++++++++++++++++
 lib/db/clients/rtr.h      | 134 +++++++++++
 lib/db/prep-stmt.c        |  92 ++++++++
 lib/db/prep-stmt.h        |  14 ++
 lib/db/util.c             |  89 ++++++++
 lib/db/util.h             |  88 +++++++
 lib/rpki-rtr/pdu.h        |   2 +
 lib/rpki/querySupport.c   |   2 +
 lib/util/macros.h         |   8 +
 mk/libdb.mk               |   1 +
 mk/rpki-rtr.mk            |   2 +-
 12 files changed, 1164 insertions(+), 305 deletions(-)

diff --git a/bin/rpki-rtr/rtr-update.c b/bin/rpki-rtr/rtr-update.c
index 3a19da5..d1c49c6 100644
--- a/bin/rpki-rtr/rtr-update.c
+++ b/bin/rpki-rtr/rtr-update.c
@@ -3,129 +3,32 @@
  ***********************/
 
 #include "util/logging.h"
+#include "db/connect.h"
+#include "db/clients/rtr.h"
 #include "config/config.h"
-#include "rpki/err.h"
-#include "rpki/scmf.h"
-#include "rpki/querySupport.h"
 #include <stdio.h>
 #include <string.h>
+#include <stdbool.h>
 #include <stdlib.h>
 #include <limits.h>
 #include <inttypes.h>
 #include <time.h>
 
-static scm *scmp = NULL;
-static scmcon *connection = NULL;
-static scmsrcha *roaSrch = NULL;
-static scmtab *roaTable = NULL;
-static scmtab *sessionTable = NULL;
-static scmtab *fullTable = NULL;
-static scmtab *updateTable = NULL;
-static scmsrcha *snSrch = NULL;
-
-// serial number of this and previous update
-static unsigned int prevSerialNum,
-    currSerialNum,
-    lastSerialNum;
-
-static void setupSnQuery(
-    scm * scmp)
-{
-    snSrch = newsrchscm(NULL, 1, 0, 1);
-    addcolsrchscm(snSrch, "serial_num", SQL_C_ULONG, 8);
-    snSrch->wherestr = NULL;
-    updateTable = findtablescm(scmp, "rtr_update");
-    if (updateTable == NULL)
-        printf("Cannot find table rtr_update\n");
-}
-
-/*
- * helper function for getLastSerialNumber 
- */
-static int setLastSN(
-    scmcon * conp,
-    scmsrcha * s,
-    ssize_t numLine)
-{
-    (void)conp;
-    (void)numLine;
-    lastSerialNum = *((unsigned int *) (s->vec[0].valptr));
-    return -1;                  // stop after first row
-}
-
-/****
- * find the serial number from the most recent update
- ****/
-static unsigned int getLastSerialNumber(
-    scmcon * connect,
-    scm * scmp)
-{
-    lastSerialNum = 0;
-    if (snSrch == NULL)
-        setupSnQuery(scmp);
-    searchscm(connect, updateTable, snSrch, NULL, setLastSN,
-              SCM_SRCH_DOVALUE_ALWAYS | SCM_SRCH_BREAK_VERR,
-              "create_time desc");
-    return lastSerialNum;
-}
-
-
-/******
- * callback that writes the data from a ROA into the update table
- *   if the ROA is valid
- *****/
-static int writeROAData(
-    scmcon * conp,
-    scmsrcha * s,
-    ssize_t numLine)
-{
-    unsigned int asn = *((unsigned int *) s->vec[0].valptr);
-    char *ptr = (char *)s->vec[1].valptr,
-        *end;
-    char msg[1024];
-    int sta;
-
-    UNREFERENCED_PARAMETER(conp);
-    UNREFERENCED_PARAMETER(numLine);
-
-    if (!checkValidity((char *)s->vec[2].valptr, 0, scmp, connection))
-        return -1;
-    while ((end = strstr(ptr, ", ")) != NULL)
-    {
-        end[0] = '\0';
-        end[1] = '\0';
-        snprintf(msg, sizeof(msg),
-                 "insert ignore into %s values (%u, %u, \"%s\");",
-                 fullTable->tabname, currSerialNum, asn, ptr);
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0, "Can't insert into %s", fullTable->tabname);
-        ptr = end + 2;
-    }
-    if (ptr[0] != '\0')
-    {
-        snprintf(msg, sizeof(msg),
-                 "insert ignore into %s values (%u, %u, \"%s\");",
-                 fullTable->tabname, currSerialNum, asn, ptr);
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0, "Can't insert into %s", fullTable->tabname);
-    }
-    return 1;
-}
-
 
 int main(
     int argc,
     char **argv)
 {
-    char msg[1024];
-    int sta;
-    unsigned int session_count;
-    unsigned int update_count;
-    unsigned int update_had_changes;    // whether there are any changes from
-                                        // prevSerialNum to currSerialNum
-    unsigned int dont_proceed;
-    int first_time = 0;
-    int force_update = 0;
+    int ret = EXIT_SUCCESS;
+    bool done_db_init = false;
+    bool done_db_thread_init = false;
+    dbconn * db = NULL;
+
+    bool first_time;
+    bool force_update = false;
+    bool update_had_changes;
+    serial_number_t previous_serial;
+    serial_number_t current_serial;
 
     if (argc < 1 || argc > 2)
     {
@@ -147,77 +50,72 @@ int main(
     }
 
     // initialize the database connection
-    scmp = initscm();
-    checkErr(scmp == NULL, "Cannot initialize database schema\n");
-    connection = connectscm(scmp->dsn, msg, sizeof(msg));
-    checkErr(connection == NULL, "Cannot connect to database: %s\n", msg);
-
-    sessionTable = findtablescm(scmp, "rtr_session");
-    checkErr(sessionTable == NULL, "Cannot find table rtr_session\n");
-
-    sta = newhstmt(connection);
-    checkErr(!SQLOK(sta), "Can't create a new statement handle\n");
-    sta = statementscm(connection, "SELECT COUNT(*) FROM rtr_session;");
-    checkErr(sta < 0, "Can't query rtr_session\n");
-    sta = getuintscm(connection, &session_count);
-    pophstmt(connection);
-    checkErr(sta < 0, "Can't get results of querying rtr_session\n");
-    if (session_count == 0)
+    if (!db_init())
     {
-        LOG(LOG_ERR, "The rpki-rtr database isn't initialized.");
-        LOG(LOG_ERR, "See %s-rpki-rtr-initialize.", PACKAGE_NAME);
-        return EXIT_FAILURE;
+        LOG(LOG_ERR, "Could not initialize database program.");
+        ret = EXIT_FAILURE;
+        goto done;
     }
-    else if (session_count != 1)
+    done_db_init = true;
+
+    if (!db_thread_init())
+    {
+        LOG(LOG_ERR, "Could not initialize database thread.");
+        ret = EXIT_FAILURE;
+        goto done;
+    }
+    done_db_thread_init = true;
+
+    db = db_connect_default(DB_CLIENT_RTR);
+    if (db == NULL)
     {
         LOG(LOG_ERR,
-            "The rtr_session table has %u entries, which should never happen.",
-            session_count);
-        LOG(LOG_ERR,
-            "Consider running %s-rpki-rtr-clear and %s-rpki-rtr-initialize.",
-            PACKAGE_NAME, PACKAGE_NAME);
+            "Could not connect to the database, check your config "
+            "file.");
+        ret = EXIT_FAILURE;
+        goto done;
+    }
+
+
+    if (!db_rtr_has_valid_session(db))
+    {
         return EXIT_FAILURE;
     }
 
-    sta = newhstmt(connection);
-    checkErr(!SQLOK(sta), "Can't create a new statement handle\n");
-    sta = statementscm(connection, "SELECT COUNT(*) FROM rtr_update;");
-    checkErr(sta < 0, "Can't query rtr_update\n");
-    sta = getuintscm(connection, &update_count);
-    pophstmt(connection);
-    checkErr(sta < 0, "Can't get results of querying rtr_update\n");
-    if (update_count <= 0)
-        first_time = 1;
-
-    // delete any updates that weren't completed
-    sta = statementscm_no_data(connection,
-                               "delete rtr_incremental\n"
-                               "from rtr_incremental\n"
-                               "left join rtr_update on 
rtr_incremental.serial_num = rtr_update.serial_num\n"
-                               "where rtr_update.serial_num is null;");
-    checkErr(sta < 0, "Can't remove unfinished entries from rtr_incremental");
-
-    sta = statementscm_no_data(connection,
-                               "delete rtr_full\n"
-                               "from rtr_full\n"
-                               "left join rtr_update on rtr_full.serial_num = 
rtr_update.serial_num\n"
-                               "where rtr_update.serial_num is null;");
-    checkErr(sta < 0, "Can't remove unfinished entries from rtr_full");
-
-    // find the last serial number
-    if (first_time)
+    // Get the previous serial number.
+    switch (db_rtr_get_latest_sernum(db, &previous_serial))
     {
-        srandom((unsigned int)time(NULL));
-        prevSerialNum = (unsigned int) random();
+        case GET_SERNUM_SUCCESS:
+            first_time = false;
+            // previous_serial was set by db_rtr_get_latest_sernum
+            break;
+
+        case GET_SERNUM_NONE:
+            first_time = true;
+            // Set previous_serial to a pseudo-random number
+            srandom((unsigned int)time(NULL));
+            previous_serial = (serial_number_t)random();
+            break;
+
+        case GET_SERNUM_ERR:
+        default:
+            LOG(LOG_ERR, "Error finding latest serial number.");
+            ret = EXIT_FAILURE;
+            goto done;
     }
-    else
+
+    if (!db_rtr_delete_incomplete_updates(db))
     {
-        prevSerialNum = getLastSerialNumber(connection, scmp);
+        LOG(LOG_ERR, "Error deleting incomplete updates.");
+        ret = EXIT_FAILURE;
+        goto done;
     }
+
+    // Get/compute the current serial number.
     if (argc > 1)
     {
-        force_update = 1;
-        if (sscanf(argv[1], "%" SCNu32, &currSerialNum) != 1)
+        force_update = true;
+        if (sscanf(argv[1], "%" SCNSERIAL, &current_serial) != 1)
         {
             fprintf(stderr,
                     "Error: next serial number must be a nonnegative 
integer\n");
@@ -226,136 +124,99 @@ int main(
     }
     else
     {
-        currSerialNum = (prevSerialNum == UINT_MAX) ? 0 : (prevSerialNum + 1);
+        // NOTE: this relies on unsigned integer wrap-around to zero
+        current_serial = previous_serial + 1;
     }
 
-    if (!first_time)
+    // Make sure we're not about to overwrite current_serial, create a
+    // loop, or start a diverging history, even though these should be
+    // *really* unlikely.
+    if (!first_time &&
+        !db_rtr_good_serials(db, previous_serial, current_serial))
     {
-        // make sure we're not about to overwrite currSerialNum, create a
-        // loop,
-        // or start a diverging history, even though these should be *really*
-        // unlikely
-        sta = newhstmt(connection);
-        checkErr(!SQLOK(sta), "Can't create a new statement handle\n");
-        snprintf(msg, sizeof(msg),
-                 "SELECT COUNT(*) > 0 FROM rtr_update WHERE\n"
-                 "serial_num = %u OR prev_serial_num = %u OR prev_serial_num = 
%u;",
-                 currSerialNum, currSerialNum, prevSerialNum);
-        sta = statementscm(connection, msg);
-        checkErr(sta < 0, "Can't query rtr_update for unusual corner cases\n");
-        sta = getuintscm(connection, &dont_proceed);
-        pophstmt(connection);
-        checkErr(sta < 0,
-                 "Can't get results of querying rtr_update for unusual corner 
cases\n");
-
         if (argc > 1)
         {
-            checkErr(dont_proceed,
-                     "Error: rtr_update is full or in an unusual state, or the 
specified next serial number already exists\n");
+            LOG(LOG_ERR,
+                "Error: rtr_update is full or in an unusual state, "
+                "or the specified next serial number already "
+                "exists.");
         }
         else
         {
-            checkErr(dont_proceed,
-                     "Error: rtr_update table is either full or in an unusual 
state\n");
+            LOG(LOG_ERR,
+                "Error: rtr_update table is either full or in an "
+                "unusual state.");
         }
+
+        ret = EXIT_FAILURE;
+        goto done;
     }
 
-    // setup up the query if this is the first time
-    // note that the where string is set to only select valid roa's, where
-    // the definition of valid is given by the configuration file
-    if (roaSrch == NULL)
+    if (!db_rtr_insert_full(db, current_serial))
     {
-        QueryField *field;
-        roaSrch = newsrchscm(NULL, 3, 0, 1);
-        field = findField("asn");
-        addcolsrchscm(roaSrch, "asn", field->sqlType, field->maxSize);
-        field = findField("ip_addrs");
-        addcolsrchscm(roaSrch, "ip_addrs", field->sqlType, field->maxSize);
-        field = findField("ski");
-        addcolsrchscm(roaSrch, "ski", field->sqlType, field->maxSize);
-        roaSrch->wherestr[0] = 0;
-        addQueryFlagTests(roaSrch->wherestr, 0);
-        roaTable = findtablescm(scmp, "roa");
-        checkErr(roaTable == NULL, "Cannot find table roa\n");
-        fullTable = findtablescm(scmp, "rtr_full");
-        checkErr(fullTable == NULL, "Cannot find table rtr_full\n");
+        LOG(LOG_ERR, "Could not copy current RPKI state.");
+        ret = EXIT_FAILURE;
+        goto done;
     }
 
-    // write all the data into the database (done writing "full")
-    sta = searchscm(connection, roaTable, roaSrch, NULL,
-                    writeROAData, SCM_SRCH_DOVALUE_ALWAYS, NULL);
-    checkErr(sta < 0 && sta != ERR_SCM_NODATA, "searchscm for ROAs failed\n");
-
-    if (!first_time)
+    if (!first_time &&
+        !db_rtr_insert_incremental(db, previous_serial, current_serial))
     {
-        char differences_query_fmt[] =
-            "INSERT INTO rtr_incremental (serial_num, is_announce, asn, 
ip_addr)\n"
-            "SELECT %u, %d, t1.asn, t1.ip_addr\n"
-            "FROM rtr_full AS t1\n"
-            "LEFT JOIN rtr_full AS t2 ON t2.serial_num = %u AND t2.asn = 
t1.asn AND t2.ip_addr = t1.ip_addr\n"
-            "WHERE t1.serial_num = %u AND t2.serial_num IS NULL;";
-
-        // announcements
-        snprintf(msg, sizeof(msg), differences_query_fmt,
-                 currSerialNum, 1, prevSerialNum, currSerialNum);
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0,
-                 "Can't populate rtr_incremental with announcements from 
serial number %u to %u",
-                 prevSerialNum, currSerialNum);
-
-        // withdrawals
-        snprintf(msg, sizeof(msg), differences_query_fmt,
-                 currSerialNum, 0, currSerialNum, prevSerialNum);
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0,
-                 "Can't populate rtr_incremental with withdrawals from serial 
number %u to %u",
-                 prevSerialNum, currSerialNum);
+        LOG(LOG_ERR, "Could not compute incremental changes.");
+        ret = EXIT_FAILURE;
+        goto done;
     }
 
-    // write the current serial number and time, making the data available
     if (first_time)
     {
-        update_had_changes = 1;
-
-        snprintf(msg, sizeof(msg),
-                 "insert into rtr_update values (%u, NULL, now(), true);",
-                 currSerialNum);
+        update_had_changes = true;
     }
     else
     {
-        sta = newhstmt(connection);
-        checkErr(!SQLOK(sta), "Can't create a new statement handle\n");
-        snprintf(msg, sizeof(msg),
-                 "SELECT COUNT(*) > 0 FROM rtr_incremental WHERE serial_num = 
%u;",
-                 currSerialNum);
-        sta = statementscm(connection, msg);
-        checkErr(sta < 0,
-                 "Can't query rtr_incremental to find out if there are any 
changes\n");
-        sta = getuintscm(connection, &update_had_changes);
-        pophstmt(connection);
-        checkErr(sta < 0,
-                 "Can't get results of querying rtr_incremental to find out if 
there are any changes\n");
-
-        snprintf(msg, sizeof(msg),
-                 "insert into rtr_update values (%u, %u, now(), true);",
-                 currSerialNum, prevSerialNum);
+        switch (db_rtr_has_incremental_changes(db, current_serial))
+        {
+            case 1:
+                update_had_changes = true;
+                break;
+
+            case 0:
+                update_had_changes = false;
+                break;
+
+            case -1:
+            default:
+                LOG(LOG_ERR,
+                    "Error determining if there were any changes.");
+                ret = EXIT_FAILURE;
+                goto done;
+        }
     }
 
-    // msg should now contain a statement to make updates available
     if (update_had_changes || force_update)
     {
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0, "Can't make updates available");
+        // Make the new serial number available for use.
+        if (
+            !db_rtr_insert_update(db, current_serial, previous_serial,
+                first_time))
+        {
+            LOG(LOG_ERR, "Error making updates available.");
+            ret = EXIT_FAILURE;
+            goto done;
+        }
     }
     else
     {
-        fprintf(stderr,
-                "Note: data had no changes since the last update, so no update 
was made.\n");
+        LOG(LOG_INFO,
+            "Data had no changes since the last update, so no update "
+            "was made.");
 
-        snprintf(msg, sizeof(msg),
-                 "delete from rtr_full where serial_num = %u;", currSerialNum);
-        sta = statementscm_no_data(connection, msg);
-        checkErr(sta < 0, "Can't delete duplicate data in rtr_full");
+        // The new data in rtr_full is useless, so delete it.
+        if (!db_rtr_delete_full(db, current_serial))
+        {
+            LOG(LOG_ERR, "Error deleting duplicate data in rtr_full.");
+            ret = EXIT_FAILURE;
+            goto done;
+        }
 
         // there's nothing to delete from rtr_incremental
     }
@@ -367,45 +228,43 @@ int main(
     // NOTE: The order of these updates and deletes is important.
     // All data must be marked as unusable according to rtr_update
     // before it is deleted from rtr_full or rtr_incremental.
-    snprintf(msg, sizeof(msg),
-             "update rtr_update set has_full = false where serial_num<>%u and 
serial_num<>%u;",
-             prevSerialNum, currSerialNum);
-    sta = statementscm_no_data(connection, msg);
-    checkErr(sta < 0, "Can't mark old rtr_full data as no longer available");
-
-    snprintf(msg, sizeof(msg),
-             "delete from rtr_full where serial_num<>%u and serial_num<>%u;",
-             prevSerialNum, currSerialNum);
-    sta = statementscm_no_data(connection, msg);
-    checkErr(sta < 0, "Can't delete old rtr_full data");
-
-    snprintf(msg, sizeof(msg),
-             "delete from rtr_update\n"
-             "where create_time < adddate(now(), interval -%zu hour)\n"
-             "and serial_num<>%u and serial_num<>%u;",
-             CONFIG_RPKI_RTR_RETENTION_HOURS_get(),
-             prevSerialNum, currSerialNum);
-    sta = statementscm_no_data(connection, msg);
-    checkErr(sta < 0, "Can't delete expired update metadata");
-
-    sta = statementscm_no_data(connection,
-                               "update rtr_update as r1\n"
-                               "left join rtr_update as r2 on r2.serial_num = 
r1.prev_serial_num\n"
-                               "set r1.prev_serial_num = NULL\n"
-                               "where r2.serial_num is null;");
-    checkErr(sta < 0,
-             "Can't mark old rtr_incremental data as no longer available");
-
-    sta = statementscm_no_data(connection,
-                               "delete rtr_incremental\n"
-                               "from rtr_incremental\n"
-                               "left join rtr_update on 
rtr_incremental.serial_num = rtr_update.serial_num\n"
-                               "where rtr_update.prev_serial_num is null;");
-    checkErr(sta < 0, "Can't delete old rtr_incremental data");
+    if (
+        !db_rtr_ignore_old_full(
+            db, current_serial, previous_serial) ||
+        !db_rtr_delete_old_full(
+            db, current_serial, previous_serial) ||
+        !db_rtr_delete_old_update(
+            db, current_serial, previous_serial) ||
+        !db_rtr_ignore_old_incremental(db) ||
+        !db_rtr_delete_old_incremental(db) ||
+        false)
+    {
+        LOG(LOG_ERR, "Error cleaning up old data.");
+        ret = EXIT_FAILURE;
+        goto done;
+    }
+
+
+done:
+
+    if (db != NULL)
+    {
+        db_disconnect(db);
+    }
+
+    if (done_db_thread_init)
+    {
+        db_thread_close();
+    }
+
+    if (done_db_init)
+    {
+        db_close();
+    }
 
     config_unload();
 
     CLOSE_LOG();
 
-    return 0;
+    return ret;
 }
diff --git a/lib/db/clients/rtr.c b/lib/db/clients/rtr.c
index 1fb2a96..6c0a553 100644
--- a/lib/db/clients/rtr.c
+++ b/lib/db/clients/rtr.c
@@ -8,13 +8,17 @@
 #include <arpa/inet.h>
 #include <inttypes.h>
 #include <stdio.h>
+#include <string.h>
+#include <limits.h>
 
 #include <my_global.h>
 #include <mysql.h>
 
+#include "config/config.h"
 #include "db/connect.h"
 #include "db/db-internal.h"
 #include "util/logging.h"
+#include "util/macros.h"
 #include "db/prep-stmt.h"
 #include "rtr.h"
 #include "db/util.h"
@@ -1293,3 +1297,569 @@ void db_rtr_reset_query_close(
     (void)conn;                 // to silence -Wunused-parameter
     free(query_state);
 }
+
+bool db_rtr_has_valid_session(
+    dbconn * conn)
+{
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_COUNT_SESSION];
+    int ret;
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    MYSQL_BIND bind_out[1];
+    memset(bind_out, 0, sizeof(bind_out));
+    unsigned long long session_count;
+    bind_out[0].buffer_type = MYSQL_TYPE_LONGLONG;
+    bind_out[0].is_unsigned = (my_bool)1;
+    bind_out[0].buffer = &session_count;
+
+    if (mysql_stmt_bind_result(stmt, bind_out))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_result() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return false;
+    }
+
+    if (mysql_stmt_store_result(stmt))
+    {
+        LOG(LOG_ERR, "mysql_stmt_store_result() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return false;
+    }
+
+    ret = mysql_stmt_fetch(stmt);
+    if (ret != 0)
+    {
+        LOG(LOG_ERR, "mysql_stmt_fetch() failed");
+        if (ret == 1)
+            LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+                mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return false;
+    }
+
+    mysql_stmt_free_result(stmt);
+
+    if (session_count == 0)
+    {
+        LOG(LOG_ERR, "The rpki-rtr database isn't initialized.");
+        LOG(LOG_ERR, "See %s-rpki-rtr-initialize.", PACKAGE_NAME);
+        return false;
+    }
+    else if (session_count != 1)
+    {
+        LOG(LOG_ERR,
+            "The rtr_session table has %llu "
+            "entries, which should never happen.",
+            session_count);
+        LOG(LOG_ERR,
+            "Consider running %s-rpki-rtr-clear and %s-rpki-rtr-initialize.",
+            PACKAGE_NAME, PACKAGE_NAME);
+        return false;
+    }
+    else
+    {
+        return true;
+    }
+}
+
+bool db_rtr_delete_incomplete_updates(
+    dbconn * conn)
+{
+    return
+        !wrap_mysql_stmt_execute(
+            conn,
+            
conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_INCOMPLETE_INCREMENTAL],
+            NULL)
+            &&
+        !wrap_mysql_stmt_execute(
+            conn,
+            
conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_INCOMPLETE_FULL],
+            NULL)
+            ;
+}
+
+bool db_rtr_good_serials(
+    dbconn * conn,
+    serial_number_t previous,
+    serial_number_t current)
+{
+    int ret;
+
+    // Convert previous and current to a type that MySQL can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned previous_uint = previous;
+    unsigned current_uint = current;
+
+    MYSQL_STMT *stmt =
+        
conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DETECT_INCONSISTENT_STATE];
+    MYSQL_BIND bind_in[3];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &current_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    bind_in[1].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[1].buffer = &current_uint;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+    bind_in[2].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[2].buffer = &previous_uint;
+    bind_in[2].is_unsigned = (my_bool)1;
+    bind_in[2].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    MYSQL_BIND bind_out[1];
+    unsigned char is_inconsistent;
+    memset(bind_out, 0, sizeof(bind_out));
+    bind_out[0].buffer_type = MYSQL_TYPE_TINY;
+    bind_out[0].is_unsigned = 1;
+    bind_out[0].buffer = &is_inconsistent;
+
+    if (mysql_stmt_bind_result(stmt, bind_out))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_result() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return false;
+    }
+
+    ret = mysql_stmt_fetch(stmt);
+    if (ret != 0)
+    {
+        LOG(LOG_ERR, "mysql_stmt_fetch() failed");
+        if (ret == 1)
+            LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+                mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return false;
+    }
+
+    mysql_stmt_free_result(stmt);
+
+    return !is_inconsistent;
+}
+
+bool db_rtr_insert_full(
+    dbconn * conn,
+    serial_number_t serial)
+{
+    struct flag_tests flag_tests;
+    flag_tests_default(&flag_tests);
+
+    // Convert serial to a type that MySQL can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned serial_uint = serial;
+
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_INSERT_FULL];
+    MYSQL_BIND bind_in[1 + FLAG_TESTS_PARAMETERS];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    flag_tests_bind(bind_in + 1, &flag_tests);
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_insert_incremental(
+    dbconn * conn,
+    serial_number_t previous_serial,
+    serial_number_t current_serial)
+{
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_INSERT_INCREMENTAL];
+    MYSQL_BIND bind_in[4];
+
+    // Convert previous_serial and current_serial to a type that MySQL
+    // can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned previous_serial_uint = previous_serial;
+    unsigned current_serial_uint = current_serial;
+
+    unsigned char is_announce;
+
+    // announcements
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &current_serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    is_announce = 1;
+    bind_in[1].buffer_type = MYSQL_TYPE_TINY;
+    bind_in[1].buffer = &is_announce;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+    bind_in[2].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[2].buffer = &previous_serial_uint;
+    bind_in[2].is_unsigned = (my_bool)1;
+    bind_in[2].is_null = (my_bool *)0;
+    bind_in[3].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[3].buffer = &current_serial_uint;
+    bind_in[3].is_unsigned = (my_bool)1;
+    bind_in[3].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    // withdrawals
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &current_serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    is_announce = 0;
+    bind_in[1].buffer_type = MYSQL_TYPE_TINY;
+    bind_in[1].buffer = &is_announce;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+    bind_in[2].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[2].buffer = &current_serial_uint;
+    bind_in[2].is_unsigned = (my_bool)1;
+    bind_in[2].is_null = (my_bool *)0;
+    bind_in[3].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[3].buffer = &previous_serial_uint;
+    bind_in[3].is_unsigned = (my_bool)1;
+    bind_in[3].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+int db_rtr_has_incremental_changes(
+    dbconn * conn,
+    serial_number_t serial)
+{
+    int ret;
+
+    // Convert serial to a type that MySQL can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned serial_uint = serial;
+
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_HAS_CHANGES];
+    MYSQL_BIND bind_in[1];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return -1;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, "mysql_stmt_execute() failed"))
+    {
+        return -1;
+    }
+
+    MYSQL_BIND bind_out[1];
+    memset(bind_out, 0, sizeof(bind_out));
+    unsigned char has_changes = 0;
+    bind_out[0].buffer_type = MYSQL_TYPE_TINY;
+    bind_out[0].buffer = &has_changes;
+    bind_out[0].is_unsigned = (my_bool)1;
+
+    if (mysql_stmt_bind_result(stmt, bind_out))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_result() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return -1;
+    }
+
+    ret = mysql_stmt_fetch(stmt);
+    if (ret != 0)
+    {
+        LOG(LOG_ERR, "mysql_stmt_fetch() failed");
+        if (ret == 1)
+            LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+                mysql_stmt_error(stmt));
+        mysql_stmt_free_result(stmt);
+        return -1;
+    }
+
+    mysql_stmt_free_result(stmt);
+
+    if (has_changes)
+    {
+        return 1;
+    }
+    else
+    {
+        return 0;
+    }
+}
+
+bool db_rtr_insert_update(
+    dbconn * conn,
+    serial_number_t current_serial,
+    serial_number_t previous_serial,
+    bool previous_serial_is_null)
+{
+    // Convert previous_serial and current_serial to a type that MySQL
+    // can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned previous_serial_uint = previous_serial;
+    unsigned current_serial_uint = current_serial;
+
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_INSERT_UPDATE];
+    MYSQL_BIND bind_in[2];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &current_serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    my_bool my_previous_serial_is_null =
+        (my_bool)previous_serial_is_null;
+    bind_in[1].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[1].buffer = &previous_serial_uint;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = &my_previous_serial_is_null;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_delete_full(
+    dbconn * conn,
+    serial_number_t serial)
+{
+    // Convert serial to a type that MySQL can take.
+    COMPILE_TIME_ASSERT(
+        TYPE_CAN_HOLD_UINT(unsigned, serial_number_t));
+    unsigned serial_uint = serial;
+
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_USELESS_FULL];
+    MYSQL_BIND bind_in[1];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &serial_uint;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_ignore_old_full(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2)
+{
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_IGNORE_OLD_FULL];
+    MYSQL_BIND bind_in[2];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &serial1;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    bind_in[1].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[1].buffer = &serial2;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_delete_old_full(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2)
+{
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_OLD_FULL];
+    MYSQL_BIND bind_in[2];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[0].buffer = &serial1;
+    bind_in[0].is_unsigned = (my_bool)1;
+    bind_in[0].is_null = (my_bool *)0;
+    bind_in[1].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[1].buffer = &serial2;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_delete_old_update(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2)
+{
+    long long retention_hours =
+        (long long)CONFIG_RPKI_RTR_RETENTION_HOURS_get();
+
+    MYSQL_STMT *stmt =
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_OLD_UPDATE];
+    MYSQL_BIND bind_in[3];
+    memset(bind_in, 0, sizeof(bind_in));
+    bind_in[0].buffer_type = MYSQL_TYPE_LONGLONG;
+    bind_in[0].buffer = &retention_hours;
+    bind_in[0].is_unsigned = (my_bool)0;
+    bind_in[0].is_null = (my_bool *)0;
+    bind_in[1].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[1].buffer = &serial1;
+    bind_in[1].is_unsigned = (my_bool)1;
+    bind_in[1].is_null = (my_bool *)0;
+    bind_in[2].buffer_type = MYSQL_TYPE_LONG;
+    bind_in[2].buffer = &serial2;
+    bind_in[2].is_unsigned = (my_bool)1;
+    bind_in[2].is_null = (my_bool *)0;
+
+    if (mysql_stmt_bind_param(stmt, bind_in))
+    {
+        LOG(LOG_ERR, "mysql_stmt_bind_param() failed");
+        LOG(LOG_ERR, "    %u: %s\n", mysql_stmt_errno(stmt),
+            mysql_stmt_error(stmt));
+        return false;
+    }
+
+    if (wrap_mysql_stmt_execute(conn, stmt, NULL))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool db_rtr_ignore_old_incremental(
+    dbconn * conn)
+{
+    return !wrap_mysql_stmt_execute(
+        conn,
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_IGNORE_OLD_INCREMENTAL],
+        NULL);
+}
+
+bool db_rtr_delete_old_incremental(
+    dbconn * conn)
+{
+    return !wrap_mysql_stmt_execute(
+        conn,
+        conn->stmts[DB_CLIENT_TYPE_RTR][DB_PSTMT_RTR_DELETE_OLD_INCREMENTAL],
+        NULL);
+}
diff --git a/lib/db/clients/rtr.h b/lib/db/clients/rtr.h
index f79c0ef..6bdd14e 100644
--- a/lib/db/clients/rtr.h
+++ b/lib/db/clients/rtr.h
@@ -93,5 +93,139 @@ void db_rtr_reset_query_close(
     dbconn * conn,
     void *query_state);
 
+/**
+    @brief Determine if there's a valid rpki-rtr session.
+*/
+bool db_rtr_has_valid_session(
+    dbconn * conn);
+
+/**
+    @brief Delete incomplete updates.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_delete_incomplete_updates(
+    dbconn * conn);
+
+/**
+    @brief Detect if using the given serial numbers would put the
+        database into a weird or inconsistent state.
+
+    @return True if the serial numbers are ok to use, false if they're
+        not or there was an error.
+*/
+bool db_rtr_good_serials(
+    dbconn * conn,
+    serial_number_t previous,
+    serial_number_t current);
+
+/**
+    @brief Copy the current state of the RPKI cache to the rtr_full
+        table, using the given serial number.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_insert_full(
+    dbconn * conn,
+    serial_number_t serial);
+
+/**
+    @brief Compute the incremental changes from previous_serial to
+        current_serial.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_insert_incremental(
+    dbconn * conn,
+    serial_number_t previous_serial,
+    serial_number_t current_serial);
+
+/**
+    @brief Determine if the serial number has any changes from the
+        serial before it.
+
+    @return If there are any changes, 1. If there are no changes, 0.
+        If there's an error, -1.
+*/
+int db_rtr_has_incremental_changes(
+    dbconn * conn,
+    serial_number_t serial);
+
+/**
+    @brief Mark an update as available.
+
+    @param[in] conn DB connection.
+    @param[in] current_serial Current serial number for the update.
+    @param[in] previous_serial Serial number for the previous update.
+        This is ignored if @p previous_serial_is_null.
+    @param[in] previous_serial_is_null Whether or not there was a
+        previous update.
+    @return True on success, false on failure.
+*/
+bool db_rtr_insert_update(
+    dbconn * conn,
+    serial_number_t current_serial,
+    serial_number_t previous_serial,
+    bool previous_serial_is_null);
+
+/**
+    @brief Delete the rtr_full data for a single serial number.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_delete_full(
+    dbconn * conn,
+    serial_number_t serial);
+
+/**
+    @brief Mark full data for serials other than serial1 or serial2
+        as unavailable.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_ignore_old_full(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2);
+
+/**
+    @brief Delete full data for serials other than serial1 or serial2.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_delete_old_full(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2);
+
+/**
+    @brief Mark updates older than the configured interval as
+        unavailable, with the exception that serial1 and serial2
+        are not marked unavailable regardless of age.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_delete_old_update(
+    dbconn * conn,
+    serial_number_t serial1,
+    serial_number_t serial2);
+
+/**
+    @brief Mark incremental data unavailable if the previous serial
+        is already unavailable.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_ignore_old_incremental(
+    dbconn * conn);
+
+/**
+    @brief Delete any incremental data that's no longer available.
+
+    @return True on success, false on failure.
+*/
+bool db_rtr_delete_old_incremental(
+    dbconn * conn);
+
 
 #endif
diff --git a/lib/db/prep-stmt.c b/lib/db/prep-stmt.c
index f6e1a8c..214a9a6 100644
--- a/lib/db/prep-stmt.c
+++ b/lib/db/prep-stmt.c
@@ -36,6 +36,98 @@ static const char *_queries_rtr[] = {
         " from rtr_full "
         " where serial_num=? " " order by asn, ip_addr " " limit ?, ?",
 
+    // DB_PSTMT_RTR_COUNT_SESSION
+    "select count(*) from rtr_session",
+
+    // DB_PSTMT_RTR_DELETE_INCOMPLETE_INCREMENTAL
+    "delete rtr_incremental "
+    "from rtr_incremental "
+    "left join rtr_update on "
+    "    rtr_incremental.serial_num = rtr_update.serial_num "
+    "where rtr_update.serial_num is null",
+
+    // DB_PSTMT_RTR_DELETE_INCOMPLETE_FULL
+    "delete rtr_full "
+    "from rtr_full "
+    "left join rtr_update on "
+    "    rtr_full.serial_num = rtr_update.serial_num "
+    "where rtr_update.serial_num is null",
+
+    // DB_PSTMT_RTR_DETECT_INCONSISTENT_STATE
+    "select count(*) > 0 "
+    "from rtr_update "
+    "where "
+    "    serial_num = ? or "
+    "    prev_serial_num = ? or "
+    "    prev_serial_num = ?",
+
+    // DB_PSTMT_RTR_INSERT_FULL
+    "insert ignore into rtr_full "
+    "(serial_num, asn, prefix, prefix_length, prefix_max_length) "
+    "select "
+    "    ?, "
+    "    rpki_roa.asn, "
+    "    rpki_roa_prefix.prefix, "
+    "    rpki_roa_prefix.prefix_length, "
+    "    rpki_roa_prefix.prefix_max_length "
+    "from rpki_roa "
+    "join rpki_roa_prefix on "
+    "    rpki_roa_prefix.roa_local_id = rpki_roa.local_id "
+    "where " FLAG_TESTS_EXPRESSION("rpki_roa.flags"),
+
+    // DB_PSTMT_RTR_INSERT_INCREMENTAL
+    "insert into rtr_incremental "
+    "(serial_num, is_announce, asn, prefix, prefix_length, prefix_max_length) "
+    "select ?, ?, t1.asn, t1.prefix, t1.prefix_length, t1.prefix_max_length "
+    "from rtr_full as t1 "
+    "left join rtr_full as t2 on "
+    "    t2.serial_num = ? and "
+    "    t2.asn = t1.asn and "
+    "    t2.prefix = t1.prefix and "
+    "    t2.prefix_length = t1.prefix_length and "
+    "    t2.prefix_max_length = t1.prefix_max_length "
+    "where t1.serial_num = ? and t2.serial_num is null",
+
+    // DB_PSTMT_RTR_HAS_CHANGES
+    "select count(*) > 0 from rtr_incremental where serial_num = ?",
+
+    // DB_PSTMT_RTR_INSERT_UPDATE
+    "insert into rtr_update "
+    "(serial_num, prev_serial_num, create_time, has_full) "
+    "values (?, ?, now(), true)",
+
+    // DB_PSTMT_RTR_DELETE_USELESS_FULL
+    "delete from rtr_full where serial_num = ?",
+
+    // DB_PSTMT_RTR_IGNORE_OLD_FULL
+    "update rtr_update "
+    "set has_full = false "
+    "where serial_num <> ? and serial_num <> ?",
+
+    // DB_PSTMT_RTR_DELETE_OLD_FULL
+    "delete from rtr_full "
+    "where serial_num <> ? and serial_num <> ?",
+
+    // DB_PSTMT_RTR_DELETE_OLD_UPDATE
+    "delete from rtr_update "
+    "where "
+    "    create_time < adddate(now(), interval (-1 * ?) hour) and "
+    "    serial_num <> ? and serial_num <> ?",
+
+    // DB_PSTMT_RTR_IGNORE_OLD_INCREMENTAL
+    "update rtr_update as r1 "
+    "left join rtr_update as r2 on "
+    "    r2.serial_num = r1.prev_serial_num "
+    "set r1.prev_serial_num = null "
+    "where r2.serial_num is null",
+
+    // DB_PSTMT_RTR_DELETE_OLD_INCREMENTAL
+    "delete rtr_incremental "
+    "from rtr_incremental "
+    "left join rtr_update on "
+    "    rtr_incremental.serial_num = rtr_update.serial_num "
+    "where rtr_update.prev_serial_num is null",
+
     NULL
 };
 
diff --git a/lib/db/prep-stmt.h b/lib/db/prep-stmt.h
index e0120f4..5a097d8 100644
--- a/lib/db/prep-stmt.h
+++ b/lib/db/prep-stmt.h
@@ -21,6 +21,20 @@ enum prep_stmts_rtr {
     DB_PSTMT_RTR_READ_SER_NUM_AS_CURRENT,
     DB_PSTMT_RTR_SERIAL_QRY_GET_NEXT,
     DB_PSTMT_RTR_RESET_QRY_GET_NEXT,
+    DB_PSTMT_RTR_COUNT_SESSION,
+    DB_PSTMT_RTR_DELETE_INCOMPLETE_INCREMENTAL,
+    DB_PSTMT_RTR_DELETE_INCOMPLETE_FULL,
+    DB_PSTMT_RTR_DETECT_INCONSISTENT_STATE,
+    DB_PSTMT_RTR_INSERT_FULL,
+    DB_PSTMT_RTR_INSERT_INCREMENTAL,
+    DB_PSTMT_RTR_HAS_CHANGES,
+    DB_PSTMT_RTR_INSERT_UPDATE,
+    DB_PSTMT_RTR_DELETE_USELESS_FULL,
+    DB_PSTMT_RTR_IGNORE_OLD_FULL,
+    DB_PSTMT_RTR_DELETE_OLD_FULL,
+    DB_PSTMT_RTR_DELETE_OLD_UPDATE,
+    DB_PSTMT_RTR_IGNORE_OLD_INCREMENTAL,
+    DB_PSTMT_RTR_DELETE_OLD_INCREMENTAL,
 };
 
 enum prep_stmts_chaser {
diff --git a/lib/db/util.c b/lib/db/util.c
index 84bd9c6..0405d7a 100644
--- a/lib/db/util.c
+++ b/lib/db/util.c
@@ -1,6 +1,7 @@
 #include <ctype.h>
 #include <inttypes.h>
 #include <stdarg.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -9,6 +10,9 @@
 #include <mysql.h>
 #include <errmsg.h>
 
+#include "config/config.h"
+#include "rpki/db_constants.h"
+
 #include "db-internal.h"
 #include "util/logging.h"
 #include "util.h"
@@ -115,3 +119,88 @@ int getStringByFieldname(
 
     return 0;
 }
+
+void flag_tests_empty(
+    struct flag_tests * tests)
+{
+    tests->mask = 0;
+    tests->result = 0;
+}
+
+void flag_tests_default(
+    struct flag_tests * tests)
+{
+    // NOTE: This must be kept in sync with addQueryFlagTests.
+
+    flag_tests_empty(tests);
+
+    flag_tests_add_tests_by_mask(tests, SCM_FLAG_VALIDATED, true);
+
+    if (!CONFIG_RPKI_ALLOW_STALE_VALIDATION_CHAIN_get())
+    {
+        flag_tests_add_tests_by_mask(tests, SCM_FLAG_NOCHAIN, false);
+    }
+
+    if (!CONFIG_RPKI_ALLOW_STALE_CRL_get())
+    {
+        flag_tests_add_tests_by_mask(tests, SCM_FLAG_STALECRL, false);
+    }
+
+    if (!CONFIG_RPKI_ALLOW_STALE_MANIFEST_get())
+    {
+        flag_tests_add_tests_by_mask(tests, SCM_FLAG_STALEMAN, false);
+    }
+
+    if (!CONFIG_RPKI_ALLOW_NO_MANIFEST_get())
+    {
+        flag_tests_add_tests_by_mask(tests, SCM_FLAG_ONMAN, true);
+    }
+
+    if (!CONFIG_RPKI_ALLOW_NOT_YET_get())
+    {
+        flag_tests_add_tests_by_mask(tests, SCM_FLAG_NOTYET, false);
+    }
+}
+
+void flag_tests_add_test_by_index(
+    struct flag_tests * tests,
+    uint_fast16_t flag,
+    bool isset)
+{
+    flag_tests_add_tests_by_mask(
+        tests,
+        1ULL << flag,
+        isset);
+}
+
+void flag_tests_add_tests_by_mask(
+    struct flag_tests * tests,
+    unsigned long long mask,
+    bool isset)
+{
+    tests->mask |= mask;
+
+    if (isset)
+    {
+        tests->result |= mask;
+    }
+    else
+    {
+        tests->result &= ~mask;
+    }
+}
+
+void flag_tests_bind(
+    MYSQL_BIND * parameters,
+    struct flag_tests const * tests)
+{
+    parameters[0].buffer_type = MYSQL_TYPE_LONGLONG;
+    parameters[0].buffer = (void *)&tests->mask;
+    parameters[0].is_unsigned = (my_bool)1;
+    parameters[0].is_null = (my_bool *)0;
+
+    parameters[1].buffer_type = MYSQL_TYPE_LONGLONG;
+    parameters[1].buffer = (void *)&tests->result;
+    parameters[1].is_unsigned = (my_bool)1;
+    parameters[1].is_null = (my_bool *)0;
+}
diff --git a/lib/db/util.h b/lib/db/util.h
index 7fa3769..731a6bb 100644
--- a/lib/db/util.h
+++ b/lib/db/util.h
@@ -1,6 +1,12 @@
 #ifndef _DB_UTIL_H
 #define _DB_UTIL_H
 
+#include <inttypes.h>
+
+#include <my_global.h>
+#include <mysql.h>
+#include <errmsg.h>
+
 #include "connect.h"
 
 
@@ -19,5 +25,87 @@ int getStringByFieldname(
     MYSQL_ROW row,
     char field_name[]);
 
+/**
+ * @brief Parameterized SQL expression to test a field containing
+ *     binary flags.
+ *
+ * @param[in] field Name of the SQL field to test.
+ */
+#define FLAG_TESTS_EXPRESSION(field) \
+    "(" field " & ? = ?)"
+
+/**
+ * @brief Number of parameters introduced by #FLAG_TESTS_EXPRESSION.
+ */
+#define FLAG_TESTS_PARAMETERS 2
+
+/**
+ * @brief Structure to describe multiple binary flag tests.
+ */
+struct flag_tests
+{
+    /**
+     * @brief Bit mask for the flags field.
+     */
+    unsigned long long mask;
+
+    /**
+     * @brief Result required when ANDing the field with #mask.
+     */
+    unsigned long long result;
+};
+
+/**
+ * @brief Initialize flag tests to an empty set of tests.
+ */
+void flag_tests_empty(
+    struct flag_tests * tests);
+
+/**
+ * @brief Initialize flag tests to the runtime default.
+ *
+ * This function sets the appropriate tests as determined by the
+ * program's configuration. See #addQueryFlagTests for the older
+ * version of this.
+ */
+void flag_tests_default(
+    struct flag_tests * tests);
+
+/**
+ * @brief Add a single test to the flag tests.
+ *
+ * @param[in,out] tests Tests to add to.
+ * @param[in] flag Which flag to test. This must be >= 0 and < 64.
+ * @param[in] isset Whether the flag must be set (1) or clear (0).
+ */
+void flag_tests_add_test_by_index(
+    struct flag_tests * tests,
+    uint_fast16_t flag,
+    bool isset);
+
+/**
+ * @brief Add one or more tests to the flag tests.
+ *
+ * @param[in,out] tests Tests to add to.
+ * @param[in] mask A bitmask of flags to test.
+ * @param[in] isset Whether the flags must be set (1) or clear (0).
+ */
+void flag_tests_add_tests_by_mask(
+    struct flag_tests * tests,
+    unsigned long long mask,
+    bool isset);
+
+/**
+ * @brief Fill in query parameters for the specified tests.
+ *
+ * The query being bound must contain #FLAG_TESTS_EXPRESSION, and
+ * @p parameters must point into the input binding array at the point
+ * where #FLAG_TESTS_EXPRESSION starts. #FLAG_TESTS_PARAMETERS
+ * parameters will be written to the binding array.
+ */
+void flag_tests_bind(
+    MYSQL_BIND * parameters,
+    struct flag_tests const * tests);
+
 
 #endif                          // _DB_UTIL_H
diff --git a/lib/rpki-rtr/pdu.h b/lib/rpki-rtr/pdu.h
index d788c30..2622c5c 100644
--- a/lib/rpki-rtr/pdu.h
+++ b/lib/rpki-rtr/pdu.h
@@ -57,6 +57,8 @@ typedef uint32_t as_number_t;
 #define PRISESSION PRIu16
 #define PRISERIAL PRIu32
 
+#define SCNSERIAL SCNu32
+
 
 /**
        @return true iff s1 is greater than s2 using serial number arithmetic
diff --git a/lib/rpki/querySupport.c b/lib/rpki/querySupport.c
index 3385319..fb47e0f 100644
--- a/lib/rpki/querySupport.c
+++ b/lib/rpki/querySupport.c
@@ -25,6 +25,8 @@ void addQueryFlagTests(
     char *whereStr,
     int needAnd)
 {
+    // NOTE: This must be kept in sync with flag_tests_default.
+
     addFlagTest(whereStr, SCM_FLAG_VALIDATED, 1, needAnd);
     if (!CONFIG_RPKI_ALLOW_STALE_VALIDATION_CHAIN_get())
         addFlagTest(whereStr, SCM_FLAG_NOCHAIN, 0, 1);
diff --git a/lib/util/macros.h b/lib/util/macros.h
index ac068ff..9c3f0a3 100644
--- a/lib/util/macros.h
+++ b/lib/util/macros.h
@@ -10,6 +10,14 @@
                } \
        } while (false)
 
+/**
+    @brief Test if the unsigned integer type @p container_type is able
+        to hold any value that the unsigned integer type @p datum_type
+        can hold.
+*/
+#define TYPE_CAN_HOLD_UINT(container_type, datum_type) \
+    ((container_type)-1 >= (datum_type)-1)
+
 #ifdef __GNUC__
 
   // check printfs
diff --git a/mk/libdb.mk b/mk/libdb.mk
index a9a381c..34f6743 100644
--- a/mk/libdb.mk
+++ b/mk/libdb.mk
@@ -2,6 +2,7 @@ noinst_LIBRARIES += lib/db/libdb.a
 
 LDADD_LIBDB = \
        lib/db/libdb.a \
+       $(LDADD_LIBRPKIRTR) \
        $(LDADD_LIBCONFIG)
 
 lib_db_libdb_a_SOURCES = \
diff --git a/mk/rpki-rtr.mk b/mk/rpki-rtr.mk
index 9bdc13c..b88da55 100644
--- a/mk/rpki-rtr.mk
+++ b/mk/rpki-rtr.mk
@@ -40,7 +40,7 @@ bin_rpki_rtr_rpki_rtr_update_SOURCES = \
        bin/rpki-rtr/rtr-update.c
 
 bin_rpki_rtr_rpki_rtr_update_LDADD = \
-       $(LDADD_LIBRPKI)
+       $(LDADD_LIBDB)
 
 
 pkglibexec_SCRIPTS += bin/rpki-rtr/rpki-rtr-clear
-- 
1.9.1


------------------------------------------------------------------------------
_______________________________________________
rpstir-devel mailing list
rpstir-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/rpstir-devel

Reply via email to