Hello, I'm writing in regards to a fix made in the last release of Postgresql (specifically, I've looked at versions 15.13 and 16.9). The fix in question is: Fix data loss in logical replication This is the change set: https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=9f21be08e I did find this related thread in pgsql-bugs: https://www.postgresql.org/message-id/CAA4eK1L7CA-A%3DVMn8fiugZ%2BCRt%2Bwz473Adrx3nxq8Ougu%3DO2kQ%40mail.gmail.com .
In our implementation of row level security (which is relatively complex) we rely heavily on temporary tables that we create with every query. Unfortunately, this approach appears to be incompatible with the above fix because of the change made to the DDL by creating temporary tables. If queries are issued quickly enough while having a logical replication stream open, executing: SELECT * FROM pg_logical_slot_get_changes('slot_name', null,null) will result in a memory allocation error (maybe related to above bug?): ERROR: invalid memory alloc request size 1086094000 Time: 17449.051 ms (00:17.449) Additionally, note that executing pg_logical_slot_get_changes is quite slow (17s above). Steps for reproducing (outside of our environment) and system information are at bottom. Would it be possible for postgres to exclude temporary table creation from DDL changes that send "invalidation messages from catalog-modifying transactions to all concurrent in-progress transactions."? It seems unnecessary to require a rebuild of the catalog cache when decoding new changes after concurrent DDL since the DDL change for temporary tables wouldn't persist outside of the transaction it's in. Or will we need to reimplement our approach to RLS? Note, we previously used session variables (instead of temp tables) for storing the current user's permissions for the table policy to query, but this led to performance issues (likely due to plan invalidation, but this was some time ago so the details are hazy). Any suggestions/guidance would be appreciated. We are using the wal2json plugin, but I'm under the impression that it wouldn't matter which plugin we use, given the error occurs when executing pg_logical_slot_get_changes(), but this could be an incorrect assumption on my part. Regards, Anne Struble -------------------------------------------------------------------------------------------------------------------- Steps To Reproduce: Postgres V16.9 Linux Ubuntu 20.04 shared_buffers 128MB work_mem: 4MB logical_decoding_work_mem: 64MB maintenance_work_mem: 64MB max_connections: 1000 Java for establishing replication slot: public static void start_simple(String username, String password, String db) throws SQLException { Connection conn; String slotName = "slot_name"; log.info("Starting streaming service"); // Setup Connection Props Properties props = new Properties(); PGProperty.USER.set(props, username); PGProperty.PASSWORD.set(props, password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "15.2"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); PGProperty.TCP_KEEP_ALIVE.set(props, true); PGProperty.SOCKET_TIMEOUT.set(props, 0); // URL String url = "jdbc:postgresql://" + "localhost" + ":" + "5432" + "/" + db; conn = DriverManager.getConnection(url, props); String sql = "SELECT * from pg_drop_replication_slot(?);"; try (PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setString(1, slotName); stmt.execute(); log.info("Replication slot was dropped"); }catch(Exception e){ log.error("Failure dropping replication slot"); } conn = DriverManager.getConnection(url, props); try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)") ) { preparedStatement.setString(1, slotName); preparedStatement.setString(2, "wal2json"); try (ResultSet rs = preparedStatement.executeQuery()) { while (rs.next()) { log.info("Slot Name: " + rs.getString(1)); log.info("Xlog Position: " + rs.getString(2)); } } } } Python for running queries (test_test is a table with one field and 10 records): 4 fast queries are executed repeatedly every .01 seconds. Before the fast query is run 6 temp def main(database_params): def execute_query(query): with psycopg2.connect(**database_params) as conn: with conn.cursor() as cursor: cursor.execute('CREATE TEMPORARY TABLE temp_table (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute('CREATE TEMPORARY TABLE temp_table2 (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute('CREATE TEMPORARY TABLE temp_table3 (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute('CREATE TEMPORARY TABLE temp_table4 (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute('CREATE TEMPORARY TABLE temp_table5 (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute('CREATE TEMPORARY TABLE temp_table6 (id text, f1 text, f2 text, f3 text, f4 text, f5 text)') cursor.execute(query) def execute_repeatedly(query): execute_query(query) threading.Timer(.01, execute_repeatedly, args=[query]).start() execute_repeatedly('select count(*) from test_test') execute_repeatedly('select count(*) from test_test') execute_repeatedly('select count(*) from test_test') execute_repeatedly('select count(*) from test_test') SQL for causing memory allocation (I executed using psql): SELECT * FROM pg_logical_slot_get_changes('slot_name', null,null);