I'm working with the logical replication support in psycopg2, and have found 
something surprising... this may be my error, of course!

My sample program is below.  It works wonderfully, but in the case when it 
starts, it re-receives the last message that it handled, even with flushing it.

Example:

postgres@localhost:~/wal2pubsub$ python waltest.py 
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}
^C
postgres@localhost:~/wal2pubsub$ python waltest.py 
{"change":[{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[6]},{"kind":"insert","schema":"public","table":"x","columnnames":["i"],"columntypes":["integer"],"columnvalues":[7]}]}

There was no database activity in that period; it just replayed the same 
message.  Shouldn't it have flushed to the end of the WAL stream and not 
reprocessed the last message?

--

import psycopg2
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL

conn = psycopg2.connect('dbname=postgres', 
connection_factory=LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(slot_name='test_slot', slot_type=REPLICATION_LOGICAL)

from select import select
from datetime import datetime

def consume(msg):
    print(msg.payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)

try:
    cur.consume_stream(consume)
except:
    pass

--
-- Christophe Pettus
   x...@thebuild.com


Reply via email to