On Tue, Oct 22, 2019 at 10:52 PM Tomas Vondra
<[email protected]> wrote:
>
> I think the patch should do the simplest thing possible, i.e. what it
> does today. Otherwise we'll never get it committed.
>
I found a couple of crashes while reviewing and testing flushing of
open transaction data:
Issue 1:
#0 0x00007f22c5722337 in raise () from /lib64/libc.so.6
#1 0x00007f22c5723a28 in abort () from /lib64/libc.so.6
#2 0x0000000000ec5390 in ExceptionalCondition
(conditionName=0x10ea814 "!dlist_is_empty(head)", errorType=0x10ea804
"FailedAssertion",
fileName=0x10ea7e0 "../../../../src/include/lib/ilist.h",
lineNumber=458) at assert.c:54
#3 0x0000000000b4fb91 in dlist_tail_element_off (head=0x19e4db8,
off=64) at ../../../../src/include/lib/ilist.h:458
#4 0x0000000000b546d0 in ReorderBufferAbortOld (rb=0x191b6b0,
oldestRunningXid=3834) at reorderbuffer.c:1966
#5 0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x19af990,
buf=0x7ffcbc26dc50) at decode.c:332
#6 0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x19af990,
record=0x19afc50) at decode.c:121
#7 0x0000000000b7109e in XLogSendLogical () at walsender.c:2845
#8 0x0000000000b6f5e4 in WalSndLoop (send_data=0xb70f77
<XLogSendLogical>) at walsender.c:2199
#9 0x0000000000b6c7e1 in StartLogicalReplication (cmd=0x1983168) at
walsender.c:1128
#10 0x0000000000b6da6f in exec_replication_command
(cmd_string=0x18f70a0 "START_REPLICATION SLOT \"sub1\" LOGICAL 0/0
(proto_version '1', publication_names '\"pub1\"')")
at walsender.c:1545
Issue 2:
#0 0x00007f1d7ddc4337 in raise () from /lib64/libc.so.6
#1 0x00007f1d7ddc5a28 in abort () from /lib64/libc.so.6
#2 0x0000000000ec4e1d in ExceptionalCondition
(conditionName=0x10ead30 "txn->final_lsn != InvalidXLogRecPtr",
errorType=0x10ea284 "FailedAssertion",
fileName=0x10ea2d0 "reorderbuffer.c", lineNumber=3052) at assert.c:54
#3 0x0000000000b577e0 in ReorderBufferRestoreCleanup (rb=0x2ae36b0,
txn=0x2bafb08) at reorderbuffer.c:3052
#4 0x0000000000b52b1c in ReorderBufferCleanupTXN (rb=0y x2ae36b0,
txn=0x2bafb08) at reorderbuffer.c:1318
#5 0x0000000000b5279d in ReorderBufferCleanupTXN (rb=0x2ae36b0,
txn=0x2b9d778) at reorderbuffer.c:1257
#6 0x0000000000b5475c in ReorderBufferAbortOld (rb=0x2ae36b0,
oldestRunningXid=3835) at reorderbuffer.c:1973
#7 0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x2b676d0,
buf=0x7ffcbc74cc00) at decode.c:332
#8 0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x2b676d0,
record=0x2b67990) at decode.c:121
#9 0x0000000000b70b2b in XLogSendLogical () at walsender.c:2845
These failures come randomly.
I'm not able to reproduce this issue with simple test case.
I have attached the test case which I used to test.
I will further try to find a scenario which could reproduce consistently.
Posting it so that it can help someone in identifying the problem
parallelly through code review by experts.
Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
#include <stdio.h>
#include <stdlib.h>
#include <libpq-fe.h>
#define LARGEDATATHREADS 32
#define SMALLDATATHREADS 50
void do_exit(PGconn *conn, PGresult *res)
{
fprintf(stderr, "%s\n", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
exit(1);
}
void initsubscriber()
{
PGconn *conn = PQconnectdb("user=user1 dbname=testdb port=5433");
if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database failed: %s\n",
PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "create subscription sub1 connection 'host=127.0.0.1 port=5432 dbname=testdb user=user1 password=123' publication pub1");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
PQfinish(conn);
}
void initpublisher()
{
PGconn *conn = PQconnectdb("user=user1 dbname=testdb");
if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database failed: %s\n",
PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
res = PQexec(conn, "CREATE PUBLICATION pub1 FOR TABLE perftest, perftest_smalldata WITH (publish='insert,update,delete')");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
PQfinish(conn);
}
void* smalldatathreadfunc(void *recordcount)
{
unsigned int reccount = *((unsigned int *) recordcount);
int count;
PGresult *res;
PGconn *conn = PQconnectdb("user=user1 dbname=testdb");
if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database failed: %s\n",
PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
/* Load data */
int subtxncount = reccount/64;
int savepointcount = 0;
for (count = 0; count < reccount; count++)
{
res = PQexec(conn, "insert into perftest_smalldata values('asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa')");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
do_exit(conn, res);
PQclear(res);
if (count % subtxncount == 0)
{
char savepointstr[128] = {0};
savepointcount++;
sprintf(savepointstr, "savepoint sp%d", savepointcount);
res = PQexec(conn, savepointstr);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
do_exit(conn, res);
PQclear(res);
}
}
printf("%d subtransaction created\n", savepointcount);
printf("Keeping txn open for 60 seconds\n");
sleep(60);
res = PQexec(conn, "SELECT * FROM perftest_smalldata");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("No data retrieved\n");
PQclear(res);
PQfinish(conn);
exit(1);
}
int rows = PQntuples(res);
printf("Record count = %d\n", rows);
PQclear(res);
PQfinish(conn);
return 0;
}
void* largedatathreadfunc(void *recordcount)
{
unsigned int reccount = *((unsigned int *) recordcount);
int count;
PGresult *res;
PGconn *conn = PQconnectdb("user=user1 dbname=testdb");
if (PQstatus(conn) == CONNECTION_BAD)
{
fprintf(stderr, "Connection to database failed: %s\n",
PQerrorMessage(conn));
PQfinish(conn);
exit(1);
}
res = PQexec(conn, "BEGIN");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
do_exit(conn, res);
}
PQclear(res);
/* Load data */
for (count = 0; count < reccount; count++)
{
res = PQexec(conn, "insert into perftest values(\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss')");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
do_exit(conn, res);
PQclear(res);
}
printf("Keeping txn open for 60 seconds\n");
sleep(60);
res = PQexec(conn, "SELECT * FROM perftest");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
printf("No data retrieved\n");
PQclear(res);
PQfinish(conn);
exit(1);
}
int rows = PQntuples(res);
printf("Record count = %d\n", rows);
PQclear(res);
PQfinish(conn);
return 0;
}
int main()
{
pthread_t largedata_thread_id[LARGEDATATHREADS];
pthread_t smalldata_thread_id[SMALLDATATHREADS];
int i;
int recordcount;
initpublisher();
initsubscriber();
for(i=0; i < LARGEDATATHREADS; i++)
{
recordcount = 2500;
pthread_create( &largedata_thread_id[i], NULL, largedatathreadfunc, &recordcount);
}
for(i=0; i < SMALLDATATHREADS; i++)
{
recordcount = 6000;
pthread_create( &smalldata_thread_id[i], NULL, smalldatathreadfunc, &recordcount);
}
for(i=0; i < LARGEDATATHREADS; i++)
{
pthread_join( largedata_thread_id[i], NULL);
}
for(i=0; i < SMALLDATATHREADS; i++)
{
pthread_join( smalldata_thread_id[i], NULL);
}
}