Hello, On 2020-Nov-24, Fujii Masao wrote:
> Thanks for working on this! > Could you tell me the discussion thread where Chloe Dives reported the issue > to? > Sorry I could not find that.. It was not public -- sorry I didn't make that clear. > I'd like to see the procedure to reproduce the issue. Here's the script. Thanks!
import psycopg2 from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL def _logical_replication_callback(message): ''' Deal with a single audit_json message; see _process_message. We get one message, therefore one call to this method, per committed transaction on the source database. ''' print("Raw message: " + message) message.cursor.send_feedback(flush_lsn=message.data_start) def main(): slot_name = 'snitch_papersnap_testing' connection = psycopg2.connect( host='fab-devdb02', port=5432, dbname='postgres', user='postgres', connection_factory=LogicalReplicationConnection, ) with connection.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE slot_name = %s", (slot_name,)) slot_exists, = cursor.fetchone() if slot_exists: cursor.drop_replication_slot(slot_name) slot_exists = False if not slot_exists: cursor.create_replication_slot(slot_name, REPLICATION_LOGICAL, output_plugin='test_decoding') cursor.start_replication(slot_name, REPLICATION_LOGICAL, decode=True) print("Logical replication started") cursor.consume_stream(_logical_replication_callback) if __name__ == '__main__': main()