On Tue, Oct 22, 2019 at 10:52 PM Tomas Vondra
<tomas.von...@2ndquadrant.com> wrote:
>
> I think the patch should do the simplest thing possible, i.e. what it
> does today. Otherwise we'll never get it committed.
>
I found a couple of crashes while reviewing and testing flushing of
open transaction data:
Issue 1:
#0  0x00007f22c5722337 in raise () from /lib64/libc.so.6
#1  0x00007f22c5723a28 in abort () from /lib64/libc.so.6
#2  0x0000000000ec5390 in ExceptionalCondition
(conditionName=0x10ea814 "!dlist_is_empty(head)", errorType=0x10ea804
"FailedAssertion",
    fileName=0x10ea7e0 "../../../../src/include/lib/ilist.h",
lineNumber=458) at assert.c:54
#3  0x0000000000b4fb91 in dlist_tail_element_off (head=0x19e4db8,
off=64) at ../../../../src/include/lib/ilist.h:458
#4  0x0000000000b546d0 in ReorderBufferAbortOld (rb=0x191b6b0,
oldestRunningXid=3834) at reorderbuffer.c:1966
#5  0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x19af990,
buf=0x7ffcbc26dc50) at decode.c:332
#6  0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x19af990,
record=0x19afc50) at decode.c:121
#7  0x0000000000b7109e in XLogSendLogical () at walsender.c:2845
#8  0x0000000000b6f5e4 in WalSndLoop (send_data=0xb70f77
<XLogSendLogical>) at walsender.c:2199
#9  0x0000000000b6c7e1 in StartLogicalReplication (cmd=0x1983168) at
walsender.c:1128
#10 0x0000000000b6da6f in exec_replication_command
(cmd_string=0x18f70a0 "START_REPLICATION SLOT \"sub1\" LOGICAL 0/0
(proto_version '1', publication_names '\"pub1\"')")
    at walsender.c:1545

Issue 2:
#0  0x00007f1d7ddc4337 in raise () from /lib64/libc.so.6
#1  0x00007f1d7ddc5a28 in abort () from /lib64/libc.so.6
#2  0x0000000000ec4e1d in ExceptionalCondition
(conditionName=0x10ead30 "txn->final_lsn != InvalidXLogRecPtr",
errorType=0x10ea284 "FailedAssertion",
    fileName=0x10ea2d0 "reorderbuffer.c", lineNumber=3052) at assert.c:54
#3  0x0000000000b577e0 in ReorderBufferRestoreCleanup (rb=0x2ae36b0,
txn=0x2bafb08) at reorderbuffer.c:3052
#4  0x0000000000b52b1c in ReorderBufferCleanupTXN (rb=0y x2ae36b0,
txn=0x2bafb08) at reorderbuffer.c:1318
#5  0x0000000000b5279d in ReorderBufferCleanupTXN (rb=0x2ae36b0,
txn=0x2b9d778) at reorderbuffer.c:1257
#6  0x0000000000b5475c in ReorderBufferAbortOld (rb=0x2ae36b0,
oldestRunningXid=3835) at reorderbuffer.c:1973
#7  0x0000000000b3ca03 in DecodeStandbyOp (ctx=0x2b676d0,
buf=0x7ffcbc74cc00) at decode.c:332
#8  0x0000000000b3c208 in LogicalDecodingProcessRecord (ctx=0x2b676d0,
record=0x2b67990) at decode.c:121
#9  0x0000000000b70b2b in XLogSendLogical () at walsender.c:2845

These failures come randomly.
I'm not able to reproduce this issue with simple test case.
I have attached the test case which I used to test.
I will further try to find a scenario which could reproduce consistently.
Posting it so that it can help someone in identifying the problem
parallelly through code review by experts.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
#include <stdio.h>
#include <stdlib.h>
#include <libpq-fe.h>

#define LARGEDATATHREADS 32
#define SMALLDATATHREADS 50

void do_exit(PGconn *conn, PGresult *res) 
{
    fprintf(stderr, "%s\n", PQerrorMessage(conn));    
    PQclear(res);
    PQfinish(conn);    
    exit(1);
}

void initsubscriber()
{
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb port=5433");
    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
            
        PQfinish(conn);
        exit(1);
    }

    PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "create subscription sub1 connection 'host=127.0.0.1 port=5432 dbname=testdb user=user1 password=123' publication pub1");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    PQfinish(conn);

}

void initpublisher()
{
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");
    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
            
        PQfinish(conn);
        exit(1);
    }

    PGresult *res = PQexec(conn, "DROP TABLE IF EXISTS perftest_smalldata");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest_smalldata(c1 varchar(10), c2 varchar(10), c3 varchar(10), c4 varchar(10), c5 varchar(10))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    res = PQexec(conn, "DROP TABLE IF EXISTS perftest");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    res = PQexec(conn, "CREATE TABLE perftest(c1 varchar(100), c2 varchar(100), c3 varchar(100), c4 varchar(100), c5 varchar(100))");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);

    res = PQexec(conn, "CREATE PUBLICATION pub1 FOR TABLE perftest, perftest_smalldata WITH (publish='insert,update,delete')");
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        do_exit(conn, res);
    }

    PQclear(res);
    PQfinish(conn);
}

void* smalldatathreadfunc(void *recordcount) 
{
    unsigned int reccount = *((unsigned int *) recordcount);
    int count;
    PGresult *res;
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");

    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
        PQfinish(conn);
        exit(1);
    }

    res = PQexec(conn, "BEGIN");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    /* Load data  */
    int subtxncount = reccount/64;
    int savepointcount = 0;

    for (count = 0; count < reccount; count++)
    {
        res = PQexec(conn, "insert into perftest_smalldata values('asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa', 'asafadfafa')");
        if (PQresultStatus(res) != PGRES_COMMAND_OK) 
            do_exit(conn, res);     
    
        PQclear(res);

	if (count % subtxncount == 0)
	{
		char savepointstr[128] = {0};
		savepointcount++;
		sprintf(savepointstr, "savepoint sp%d", savepointcount);
		res = PQexec(conn, savepointstr);
     	        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            		do_exit(conn, res);

        	PQclear(res);
	}    
    }

    printf("%d subtransaction created\n", savepointcount);

    printf("Keeping txn open for 60 seconds\n");
    sleep(60);

    res = PQexec(conn, "SELECT * FROM perftest_smalldata");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        printf("No data retrieved\n");
        PQclear(res);
        PQfinish(conn);
        exit(1);
    }

    int rows = PQntuples(res);
    printf("Record count = %d\n", rows);

    PQclear(res);
    PQfinish(conn);
    return 0;
}


void* largedatathreadfunc(void *recordcount) 
{
    unsigned int reccount = *((unsigned int *) recordcount);
    int count;
    PGresult *res;
    PGconn *conn = PQconnectdb("user=user1 dbname=testdb");

    if (PQstatus(conn) == CONNECTION_BAD) 
    {
        fprintf(stderr, "Connection to database failed: %s\n",
            PQerrorMessage(conn));
        PQfinish(conn);
        exit(1);
    }

    res = PQexec(conn, "BEGIN");
    if (PQresultStatus(res) != PGRES_COMMAND_OK) 
    {
        do_exit(conn, res); 
    }
    
    PQclear(res);

    /* Load data  */
    for (count = 0; count < reccount; count++)
    {
        res = PQexec(conn, "insert into perftest values(\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss',\
'asafadfadfdasdasdafsafasfafafsasfasafsdafsfasfasdsdfasfdsfdsasdfsasaddfsadsdasdfssafdsfsfssasdfss')");
        if (PQresultStatus(res) != PGRES_COMMAND_OK) 
            do_exit(conn, res);     
    
        PQclear(res);    
    }

    printf("Keeping txn open for 60 seconds\n");
    sleep(60);

    res = PQexec(conn, "SELECT * FROM perftest");
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        printf("No data retrieved\n");
        PQclear(res);
        PQfinish(conn);
        exit(1);
    }

    int rows = PQntuples(res);
    printf("Record count = %d\n", rows);

    PQclear(res);
    PQfinish(conn);
    return 0;
}

int main()
{
   pthread_t largedata_thread_id[LARGEDATATHREADS];
   pthread_t smalldata_thread_id[SMALLDATATHREADS];
   int i;
   int recordcount;

   initpublisher();
   initsubscriber();

   for(i=0; i < LARGEDATATHREADS; i++)
   {
      recordcount = 2500;
      pthread_create( &largedata_thread_id[i], NULL, largedatathreadfunc, &recordcount);
   }

   for(i=0; i < SMALLDATATHREADS; i++)
   {
      recordcount = 6000;
      pthread_create( &smalldata_thread_id[i], NULL, smalldatathreadfunc, &recordcount);
   }

   for(i=0; i < LARGEDATATHREADS; i++)
   {
      pthread_join( largedata_thread_id[i], NULL);
   }

   for(i=0; i < SMALLDATATHREADS; i++)
   {
      pthread_join( smalldata_thread_id[i], NULL);
   }
}

Reply via email to