On Tue, Oct 22, 2019 at 10:52 PM Tomas Vondra <tomas.von...@2ndquadrant.com> wrote: > > I think the patch should do the simplest thing possible, i.e. what it > does today. Otherwise we'll never get it committed. > I found a couple of crashes while reviewing and testing flushing of open transaction data: Issue 1: #0 0x00007f22c5722337 in raise () from /lib64/libc.so.6 #1 0x00007f22c5723a28 in abort () from /lib64/libc.so.6 #2 0x0000000000ec5390 in ExceptionalCondition (conditionName=0x10ea814 "!dlist_is_empty(head)", errorType=0x10ea804 "FailedAssertion", fileName=0x10ea7e0 "../../../../src/include/lib/ilist.h", lineNumber=458) at assert.c:54 #3 0x0000000000b4fb91 in dlist_tail_element_off (head=0x19e4db8, off=64) at ../../../../src/include/lib/ilist.h:458 #4 0x0000000000b546d0 in ReorderBufferAbortOld (rb=0x191b6b0, oldestRunningXid=3834) at reorderbuffer.c:1966 #5 0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x19af990, buf=0x7ffcbc26dc50) at decode.c:332 #6 0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x19af990, record=0x19afc50) at decode.c:121 #7 0x0000000000b7109e in XLogSendLogical () at walsender.c:2845 #8 0x0000000000b6f5e4 in WalSndLoop (send_data=0xb70f77 <XLogSendLogical>) at walsender.c:2199 #9 0x0000000000b6c7e1 in StartLogicalReplication (cmd=0x1983168) at walsender.c:1128 #10 0x0000000000b6da6f in exec_replication_command (cmd_string=0x18f70a0 "START_REPLICATION SLOT \"sub1\" LOGICAL 0/0 (proto_version '1', publication_names '\"pub1\"')") at walsender.c:1545
Issue 2: #0 0x00007f1d7ddc4337 in raise () from /lib64/libc.so.6 #1 0x00007f1d7ddc5a28 in abort () from /lib64/libc.so.6 #2 0x0000000000ec4e1d in ExceptionalCondition (conditionName=0x10ead30 "txn->final_lsn != InvalidXLogRecPtr", errorType=0x10ea284 "FailedAssertion", fileName=0x10ea2d0 "reorderbuffer.c", lineNumber=3052) at assert.c:54 #3 0x0000000000b577e0 in ReorderBufferRestoreCleanup (rb=0x2ae36b0, txn=0x2bafb08) at reorderbuffer.c:3052 #4 0x0000000000b52b1c in ReorderBufferCleanupTXN (rb=0y x2ae36b0, txn=0x2bafb08) at reorderbuffer.c:1318 #5 0x0000000000b5279d in ReorderBufferCleanupTXN (rb=0x2ae36b0, txn=0x2b9d778) at reorderbuffer.c:1257 #6 0x0000000000b5475c in ReorderBufferAbortOld (rb=0x2ae36b0, oldestRunningXid=3835) at reorderbuffer.c:1973 #7 0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x2b676d0, buf=0x7ffcbc74cc00) at decode.c:332 #8 0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x2b676d0, record=0x2b67990) at decode.c:121 #9 0x0000000000b70b2b in XLogSendLogical () at walsender.c:2845 These failures come randomly. I'm not able to reproduce this issue with simple test case. I have attached the test case which I used to test. I will further try to find a scenario which could reproduce consistently. Posting it so that it can help someone in identifying the problem parallelly through code review by experts. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com
#include <stdio.h> #include <stdlib.h> #include <libpq-fe.h> #define LARGEDATATHREADS 32 #define SMALLDATATHREADS 50 void do_exit(PGconn *conn, PGresult *res) { fprintf(stderr, "%s\n", PQerrorMessage(conn)); PQclear(res); PQfinish(conn); exit(1); } void initsubscriber() { PGconn *conn = PQconnectdb("user=user1 dbname=testdb port=5433"); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); exit(1); } PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "DROP TABLE IF EXISTS perftest"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "create subscription sub1 connection 'host=127.0.0.1 port=5432 dbname=testdb user=user1 password=123' publication pub1"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); PQfinish(conn); } void initpublisher() { PGconn *conn = PQconnectdb("user=user1 dbname=testdb"); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); exit(1); } PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "DROP TABLE IF EXISTS perftest"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); res = PQexec(conn, "CREATE PUBLICATION pub1 FOR TABLE perftest, perftest_smalldata WITH (publish='insert,update,delete')"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); PQfinish(conn); } void* smalldatathreadfunc(void *recordcount) { unsigned int reccount = *((unsigned int *) recordcount); int count; PGresult *res; PGconn *conn = PQconnectdb("user=user1 dbname=testdb"); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); exit(1); } res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); /* Load data */ int subtxncount = reccount/64; int savepointcount = 0; for (count = 0; count < reccount; count++) { res = PQexec(conn, "insert into perftest_smalldata values('asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa')"); if (PQresultStatus(res) != PGRES_COMMAND_OK) do_exit(conn, res); PQclear(res); if (count % subtxncount == 0) { char savepointstr[128] = {0}; savepointcount++; sprintf(savepointstr, "savepoint sp%d", savepointcount); res = PQexec(conn, savepointstr); if (PQresultStatus(res) != PGRES_COMMAND_OK) do_exit(conn, res); PQclear(res); } } printf("%d subtransaction created\n", savepointcount); printf("Keeping txn open for 60 seconds\n"); sleep(60); res = PQexec(conn, "SELECT * FROM perftest_smalldata"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { printf("No data retrieved\n"); PQclear(res); PQfinish(conn); exit(1); } int rows = PQntuples(res); printf("Record count = %d\n", rows); PQclear(res); PQfinish(conn); return 0; } void* largedatathreadfunc(void *recordcount) { unsigned int reccount = *((unsigned int *) recordcount); int count; PGresult *res; PGconn *conn = PQconnectdb("user=user1 dbname=testdb"); if (PQstatus(conn) == CONNECTION_BAD) { fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn)); PQfinish(conn); exit(1); } res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) { do_exit(conn, res); } PQclear(res); /* Load data */ for (count = 0; count < reccount; count++) { res = PQexec(conn, "insert into perftest values(\ 'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\ 'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\ 'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\ 'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\ 'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss')"); if (PQresultStatus(res) != PGRES_COMMAND_OK) do_exit(conn, res); PQclear(res); } printf("Keeping txn open for 60 seconds\n"); sleep(60); res = PQexec(conn, "SELECT * FROM perftest"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { printf("No data retrieved\n"); PQclear(res); PQfinish(conn); exit(1); } int rows = PQntuples(res); printf("Record count = %d\n", rows); PQclear(res); PQfinish(conn); return 0; } int main() { pthread_t largedata_thread_id[LARGEDATATHREADS]; pthread_t smalldata_thread_id[SMALLDATATHREADS]; int i; int recordcount; initpublisher(); initsubscriber(); for(i=0; i < LARGEDATATHREADS; i++) { recordcount = 2500; pthread_create( &largedata_thread_id[i], NULL, largedatathreadfunc, &recordcount); } for(i=0; i < SMALLDATATHREADS; i++) { recordcount = 6000; pthread_create( &smalldata_thread_id[i], NULL, smalldatathreadfunc, &recordcount); } for(i=0; i < LARGEDATATHREADS; i++) { pthread_join( largedata_thread_id[i], NULL); } for(i=0; i < SMALLDATATHREADS; i++) { pthread_join( smalldata_thread_id[i], NULL); } }