Hello,

On 2020-Nov-24, Fujii Masao wrote:

> Thanks for working on this!
> Could you tell me the discussion thread where Chloe Dives reported the issue 
> to?
> Sorry I could not find that..

It was not public -- sorry I didn't make that clear.

> I'd like to see the procedure to reproduce the issue.

Here's the script.


Thanks!
import psycopg2

from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL


def _logical_replication_callback(message):
    ''' Deal with a single audit_json message; see _process_message. We get one message, therefore one
        call to this method, per committed transaction on the source database.
    '''
    print("Raw message: " + message)
    message.cursor.send_feedback(flush_lsn=message.data_start)


def main():
    slot_name = 'snitch_papersnap_testing'

    connection = psycopg2.connect(
        host='fab-devdb02',
        port=5432,
        dbname='postgres',
        user='postgres',
        connection_factory=LogicalReplicationConnection,
    )

    with connection.cursor() as cursor:
        cursor.execute("SELECT COUNT(*) FROM pg_replication_slots WHERE slot_name = %s", (slot_name,))
        slot_exists, = cursor.fetchone()

        if slot_exists:
            cursor.drop_replication_slot(slot_name)
            slot_exists = False

        if not slot_exists:
            cursor.create_replication_slot(slot_name, REPLICATION_LOGICAL, output_plugin='test_decoding')

        cursor.start_replication(slot_name, REPLICATION_LOGICAL, decode=True)
        print("Logical replication started")
        cursor.consume_stream(_logical_replication_callback)


if __name__ == '__main__':
    main()

Reply via email to