Hello,
I'm writing in regards to a fix made in the last release of Postgresql
(specifically, I've looked at versions 15.13 and 16.9). The fix in question
is:
Fix data loss in logical replication
This is the change set:
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f21be08e
I did find this related thread in pgsql-bugs:
https://www.postgresql.org/message-id/CAA4eK1L7CA-A%3DVMn8fiugZ%2BCRt%2Bwz473Adrx3nxq8Ougu%3DO2kQ%40mail.gmail.com
.

In our implementation of row level security (which is relatively complex)
we rely heavily on temporary tables that we create with every query.
Unfortunately, this approach appears to be incompatible with the above fix
because of the change made to the DDL by creating temporary tables. If
queries are issued quickly enough while having a logical replication stream
open, executing:
SELECT * FROM pg_logical_slot_get_changes('slot_name', null,null)
will result in a memory allocation error (maybe related to above bug?):
ERROR:  invalid memory alloc request size 1086094000
Time: 17449.051 ms (00:17.449)
Additionally, note that executing pg_logical_slot_get_changes is quite slow
(17s above). Steps for reproducing (outside of our environment) and system
information are at bottom.

Would it be possible for postgres to exclude temporary table creation from
DDL changes that send "invalidation messages from catalog-modifying
transactions to all concurrent in-progress transactions."?
It seems unnecessary to require a rebuild of the catalog cache when
decoding new changes after concurrent DDL since the DDL change for
temporary tables wouldn't persist outside of the transaction it's in. Or
will we need to reimplement our approach to RLS?

Note, we previously used session variables (instead of temp tables) for
storing the current user's permissions for the table policy to query, but
this led to performance issues (likely due to plan invalidation, but this
was some time ago so the details are hazy).

Any suggestions/guidance would be appreciated. We are using the wal2json
plugin, but I'm under the impression that it wouldn't matter which plugin
we use, given the error occurs when executing
pg_logical_slot_get_changes(), but this could be an incorrect assumption on
my part.

Regards,
Anne Struble

--------------------------------------------------------------------------------------------------------------------
Steps To Reproduce:
Postgres V16.9
Linux Ubuntu 20.04
shared_buffers 128MB
work_mem: 4MB
logical_decoding_work_mem: 64MB
maintenance_work_mem: 64MB
max_connections: 1000

Java for establishing replication slot:

public static void start_simple(String username, String password, String
db) throws SQLException {
    Connection conn;
    String slotName = "slot_name";
    log.info("Starting streaming service");

    // Setup Connection Props
    Properties props = new Properties();
    PGProperty.USER.set(props, username);
    PGProperty.PASSWORD.set(props, password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "15.2");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");
    PGProperty.TCP_KEEP_ALIVE.set(props, true);
    PGProperty.SOCKET_TIMEOUT.set(props, 0);

    // URL
    String url = "jdbc:postgresql://" + "localhost" + ":" + "5432" + "/" +
db;

    conn = DriverManager.getConnection(url, props);
    String sql = "SELECT * from pg_drop_replication_slot(?);";

    try (PreparedStatement stmt = conn.prepareStatement(sql)) {
        stmt.setString(1, slotName);
        stmt.execute();
        log.info("Replication slot was dropped");
    }catch(Exception e){
        log.error("Failure dropping replication slot");
    }

    conn = DriverManager.getConnection(url, props);
    try (PreparedStatement preparedStatement =
                 conn.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)") )
    {
        preparedStatement.setString(1, slotName);
        preparedStatement.setString(2, "wal2json");
        try (ResultSet rs = preparedStatement.executeQuery())
        {
            while (rs.next())
            {
                log.info("Slot Name: " + rs.getString(1));
                log.info("Xlog Position: " + rs.getString(2));
            }
        }
    }
}

Python for running queries (test_test is a table with one field and 10
records):
4 fast queries are executed repeatedly every .01 seconds. Before the fast
query is run 6 temp

def main(database_params):
    def execute_query(query):
        with psycopg2.connect(**database_params) as conn:
            with conn.cursor() as cursor:
                cursor.execute('CREATE TEMPORARY TABLE temp_table (id text,
f1 text, f2 text, f3 text, f4 text, f5 text)')
                cursor.execute('CREATE TEMPORARY TABLE temp_table2 (id
text, f1 text, f2 text, f3 text, f4 text, f5 text)')
                cursor.execute('CREATE TEMPORARY TABLE temp_table3 (id
text, f1 text, f2 text, f3 text, f4 text, f5 text)')
                cursor.execute('CREATE TEMPORARY TABLE temp_table4 (id
text, f1 text, f2 text, f3 text, f4 text, f5 text)')
                cursor.execute('CREATE TEMPORARY TABLE temp_table5 (id
text, f1 text, f2 text, f3 text, f4 text, f5 text)')
                cursor.execute('CREATE TEMPORARY TABLE temp_table6 (id
text, f1 text, f2 text, f3 text, f4 text, f5 text)')

                cursor.execute(query)

    def execute_repeatedly(query):
        execute_query(query)
        threading.Timer(.01, execute_repeatedly, args=[query]).start()


    execute_repeatedly('select count(*) from test_test')
    execute_repeatedly('select count(*) from test_test')
    execute_repeatedly('select count(*) from test_test')
    execute_repeatedly('select count(*) from test_test')

SQL for causing memory allocation (I executed using psql):
SELECT * FROM pg_logical_slot_get_changes('slot_name', null,null);

Reply via email to