On 13-01-24 11:15 AM, Steve Singer wrote:
On 13-01-24 06:40 AM, Andres Freund wrote:

Fair enough. I am also working on a user of this infrastructure but that
doesn't help you very much. Steve Singer seemed to make some stabs at
writing an output plugin as well. Steve, how far did you get there?

I was able to get something that generated output for INSERT statements in a format similar to what a modified slony apply trigger would want. This was with the list of tables to replicate hard-coded in the plugin. This was with the patchset from the last commitfest.I had gotten a bit hung up on the UPDATE and DELETE support because slony allows you to use an arbitrary user specified unique index as your key. It looks like better support for tables with a unique non-primary key is in the most recent patch set. I am hoping to have time this weekend to update my plugin to use parameters passed in on the init and other updates in the most recent version. If I make some progress I will post a link to my progress at the end of the weekend. My big issue is that I have limited time to spend on this.




A few more comments;

In decode.c DecodeDelete

+   if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+   {
+ elog(DEBUG2, "huh, no primary key for a delete on wal_level = logical?");
+       return;
+   }
+

I think we should be passing delete's with candidate key data logged to the plugin. If the table isn't a replicated table then ignoring the delete is fine. If the table is a replicated table but someone has deleted the unique index from the table then the plugin will receive INSERT changes on the table but not DELETE changes. If this happens the plugin would have any way of knowing that it is missing delete changes. If my plugin gets passed a DELETE change record but with no key data then my plugin could do any of
1.  Start screaming for help (ie log errors)
2.  Drop the table from replication
3. Pass the delete (with no key values) onto the replication client and let it deal with it (see 1 and 2)

Also, 'huh' isn't one of our standard log message phrases :)


How do you plan on dealing with sequences?
I don't see my plugin being called on sequence changes and I don't see XLOG_SEQ_LOG listed in DecodeRecordIntoReorderBuffer. Is there a reason why this can't be easily added?

Also what do we want to do about TRUNCATE support. I could always leave a TRUNCATE trigger in place that logged the truncate to a sl_truncates and have my replication daemon respond to the insert on a sl_truncates table by actually truncating the data on the replica.

I've spent some time this weekend updating my prototype plugin that generates slony 2.2 style COPY output. I have attached my progress here (also https://github.com/ssinger/slony1-engine/tree/logical_repl). I have not gotten as far as modifying slon to act as a logical log receiver, or made a version of the slony apply trigger that would process these changes. I haven't looked into the details of what is involved in setting up a subscription with the snapshot exporting.

I couldn't get the options on the START REPLICATION command to parse so I just hard coded some list building code in the init method. I do plan on pasing the list of tables to replicate from the replica to the plugin (because this list comes from the replica). Passing what could be a few thousand table names as a list of arguments is a bit ugly and I admit my list processing code is rough. Does this make us want to reconsider the format of the option_list ?

I guess should provide an opinion on if I think that the patch in this CF, if committed could be used to act as a source for slony instead of the log trigger.


The biggest missing piece I mentioned in my email yesterday, that we aren't logging the old primary key on row UPDATEs. I don't see building a credible replication system where you don't allow users to update any column of a row.

The other issues I've raised (DecodeDelete hiding bad deletes, replication options not parsing for me) look like easy fixes

no wal decoding support for sequences or truncate are things that I could work around by doing things much like slony does today. The SYNC can still capture the sequence changes in a table (where the INSERT's would be logged) and I can have a trigger capture truncates.

I mostly did this review from the point of view of someone trying to use the feature, I haven't done a line-by-line review of the code.

I suspect Andres can address these issues and get an updated patch out during this CF. I think a more detailed code review by someone more familiar with postgres internals will reveal a handful of other issues that hopefully can be fixed without a lot of effort. If this were the only patch in the commitfest I would encourage Andres to push to get these changes done. If the standard for CF4 is that a patch needs to be basically in a commitable state at the start of the CF, other than minor issues, then I don't think this patch meets that bar. In a few more weeks from now, with a handful of more updates and re-reviews it might. If we give everyone in the CF that much time to get their patches into a committable state then I think the CF will drag on until April or even May and we might not see 9.3 released until close to Christmas (4 patches so far have been rejected or returned with feedback, 51 need reviewer or committer attention) . I'm not sure I have a huge problem with that but I don't think it is what was agreed to in the developer meeting last May.

If this patch is going to get bumped to 9.4 I really hope that someone with good knowledge of the internals (ie a committer) can give this patch a good review sooner rather than later. If there are issues Andres has overlooked that are more serious or complicated to fix I would like to see them raised before the next CF in June.

Steve




BTW, why does all the transaction reordering stuff has to be in core?
It didn't use to, but people argued pretty damned hard that no undecoded
data should ever allowed to leave the postgres cluster. And to be fair
it makes writing an output plugin *way* much easier. Check
http://git.postgresql.org/gitweb/?p=users/andresfreund/postgres.git;a=blob;f=contrib/test_decoding/test_decoding.c;hb=xlog-decoding-rebasing-cf4
If you skip over tuple_to_stringinfo(), which is just pretty generic
scaffolding for converting a whole tuple to a string, writing out the
changes in some format by now is pretty damn simple.


I think we will find that the replication systems won't be the only users of this feature. I have often seen systems that have a logging requirement for auditing purposes or to log then reconstruct the sequence of changes made to a set of tables in order to feed a downstream application. Triggers and a journaling table are the traditional way of doing this but it should be pretty easy to write a plugin to accomplish the same thing that should give better performance. If the reordering stuff wasn't in core this would be much harder.


How much of this infrastructure is to support replicating DDL changes? IOW,
if we drop that requirement, how much code can we slash?
Unfortunately I don't think too much unless we add in other code that
allows us to check whether the current definition of a table is still
the same as it was back when the tuple was logged.

Any other features or requirements that could be dropped? I think it's clear at this stage that this patch is not going to be committed as it is. If you can reduce it to a fraction of what it is now, that fraction might have a chance. Otherwise,
it's just going to be pushed to the next commitfest as whole, and we're
going to be having the same doubts and discussions then.
One thing that reduces complexity is to declare the following as
unsupported:
- CREATE TABLE foo(data text);
- DECODE UP TO HERE;
- INSERT INTO foo(data) VALUES(very-long-to-be-externally-toasted-tuple);
- DROP TABLE foo;
- DECODE UP TO HERE;

but thats just a minor thing.

I think what we can do more realistically than to chop of required parts
of changeset extraction is to start applying some of the preliminary
patches independently:
- the relmapper/relfilenode changes + pg_relation_by_filenode(spc,
   relnode) should be independently committable if a bit boring
- allowing walsenders to connect to a database possibly needs an interface change
   but otherwise it should be fine to go in independently. It also has
   other potential use-cases, so I think thats fair.
- logging xl_running_xact's more frequently could also be committed
   independently and makes sense independently as it allows a standby to
   enter HS faster if the master is busy
- Introducing InvalidCommandId should be relatively uncontroversial. The
   fact that no invalid value for command ids exists is imo an oversight
- the *Satisfies change could be applied and they are imo ready but
   there's no use-case for it without the rest, so I am not sure whether
   theres a point
- currently not separately available, but we could add wal_level=logical
   independently. There would be no user of it, but it would be partial
   work. That includes the relcache support for keeping track of the
   primary key which already is available separately.


Greetings,

Andres Freund





/*-------------------------------------------------------------------------
 *
 * 
 *
 * Copyright (c) 2012, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *		  
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
#include "catalog/index.h"

#include "replication/output_plugin.h"
#include "replication/snapbuild.h"

#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "fmgr.h"
#include "access/hash.h"
#include "replication/logical.h"
#include "nodes/makefuncs.h"
#include "commands/defrem.h"

PG_MODULE_MAGIC;

void _PG_init(void);


extern void pg_decode_init(LogicalDecodingContext * ctx, bool is_init);

extern bool pg_decode_begin_txn(LogicalDecodingContext * ctx, 
								ReorderBufferTXN* txn);
extern bool pg_decode_commit_txn(LogicalDecodingContext * ctx,
								 ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
extern bool pg_decode_change(LogicalDecodingContext * ctx, 
							 ReorderBufferTXN* txn,
							 Oid tableoid, ReorderBufferChange *change);

extern bool pg_decode_clean(LogicalDecodingContext * ctx);

char * columnAsText(TupleDesc tupdesc, HeapTuple tuple,int idx);

unsigned int local_id=0;

HTAB * replicated_tables=NULL;

typedef struct  {
	const char * key;
	const char * namespace;
	const char * table_name;
	int set;

} replicated_table;

static uint32
replicated_table_hash(const void *kp, Size ksize)
{
	char	   *key = *((char **) kp);
	return hash_any((void *) key, strlen(key));
}
static int
replicated_table_cmp(const void *kp1, const void *kp2, Size ksize)
{
	char	   *key1 = *((char **) kp1);
	char	   *key2 = *((char **) kp2);
	return strcmp(key1, key2);
}

bool is_replicated(const char * namespace,const char * table);


void
_PG_init(void)
{
}




void
pg_decode_init(LogicalDecodingContext * ctx, bool is_init)
{	
	char * table;
	bool found;
	HASHCTL hctl;
	int i ;

	


	List * options = list_make1(makeDefElem("schema_table_1"
											,(Node*)makeString("public")));
	options = lappend(options,makeDefElem("table_1"
										  ,(Node*)makeString("a")));
	options = lappend(options,makeDefElem("schema_table_2"
										  ,(Node*)makeString("public")));
	options = lappend(options,makeDefElem("table_2"
										  ,(Node*)makeString("b")));
	options = lappend(options,makeDefElem("schema_table_3"
										  ,(Node*)makeString("public")));
	options = lappend(options,makeDefElem("table_3"
										  ,(Node*)makeString("c")));


	ctx->output_plugin_private = AllocSetContextCreate(TopMemoryContext,
									 "slony logical  context",
									 ALLOCSET_DEFAULT_MINSIZE,
									 ALLOCSET_DEFAULT_INITSIZE,
									 ALLOCSET_DEFAULT_MAXSIZE);


	AssertVariableIsOfType(&pg_decode_init, LogicalDecodeInitCB);
	MemoryContext context = (MemoryContext)ctx->output_plugin_private;
	MemoryContext old = MemoryContextSwitchTo(context);
											

	/**
	 * query the local database to find
	 * 1. the local_id
	 */
	elog(NOTICE,"inside of pg_decode_init");
	hctl.keysize = sizeof(char*);
	hctl.entrysize = sizeof(replicated_table);
	hctl.hash = replicated_table_hash;
	hctl.match = replicated_table_cmp;

	/**
	 * build a hash table with information on all replicated tables.
	 */
	replicated_tables=hash_create("replicated_tables",10,&hctl,
								  HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
	for(i = 0; i < options->length; i= i + 2 )
	{
		DefElem * def_schema = (DefElem*) list_nth(options,i);
		DefElem * def_table = (DefElem*) list_nth(options,i+1);
		const char * schema= defGetString(def_schema);
		const char * table_name = defGetString(def_table);

		table = palloc(strlen(schema) + strlen(table_name)+2);
		sprintf(table,"%s.%s",schema,table_name);
		replicated_table * entry=hash_search(replicated_tables,
											 &table,HASH_ENTER,&found);
		entry->key=table;
		entry->namespace=pstrdup(schema);
		entry->table_name=pstrdup(table_name);
		entry->set=1;
	}
	MemoryContextSwitchTo(old);
}


bool
pg_decode_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN* txn)
{
	AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
	/**
	 * we can ignore the begin and commit. slony operates
	 * on SYNC boundaries.
	 */
	elog(NOTICE,"inside of begin");
	ctx->prepare_write(ctx,txn->lsn,txn->xid);
	appendStringInfo(ctx->out, "BEGIN %d", txn->xid);
	ctx->write(ctx,txn->lsn,txn->xid);
	return true;
}

bool
pg_decode_commit_txn( LogicalDecodingContext * ctx,
					 ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
{
	AssertVariableIsOfType(&pg_decode_commit_txn, LogicalDecodeCommitCB);
	/**
	 * we can ignore the begin and commit. slony operates
	 * on SYNC boundaries.
	 */
	elog(NOTICE,"inside of commit");
	ctx->prepare_write(ctx,txn->lsn,txn->xid);
	appendStringInfo(ctx->out, "COMMIT %d", txn->xid);
	ctx->write(ctx,txn->lsn,txn->xid);
	return true;
}



bool
pg_decode_change(LogicalDecodingContext * ctx, ReorderBufferTXN* txn,
				 Oid tableoid, ReorderBufferChange *change)
{

	
	Relation relation = RelationIdGetRelation(tableoid);	
	TupleDesc	tupdesc = RelationGetDescr(relation);
	Form_pg_class class_form = RelationGetForm(relation);
	MemoryContext context = (MemoryContext)ctx->output_plugin_private;
	MemoryContext old = MemoryContextSwitchTo(context);
	int i;
	HeapTuple tuple;
	/**
	 * we build up an array of Datum's so we can convert this
	 * to an array and use array_out to get a text representation.
	 * it might be more efficient to leave everything as 
	 * cstrings and write our own quoting/escaping code
	 */
	Datum		*cmdargs = NULL;
	Datum		*cmdargselem = NULL;
	bool	   *cmdnulls = NULL;
	bool	   *cmdnullselem = NULL;
	int		   cmddims[1];
	int        cmdlbs[1];
	ArrayType  *outvalues;
	const char * array_text;
	Oid arraytypeoutput;
	bool  arraytypeisvarlena;
	HeapTuple array_type_tuple;
	FmgrInfo flinfo;
	char action='?';
	int update_cols=0;
	int table_id=0;
	const char * table_name;
	const char * namespace;
	ctx->prepare_write(ctx,txn->lsn,txn->xid);
	elog(NOTICE,"inside og pg_decode_change");

	
	namespace=get_namespace_name(class_form->relnamespace);
	table_name=NameStr(class_form->relname);

	if( ! is_replicated(namespace,table_name) ) 
	{
		RelationClose(relation);
		MemoryContextSwitchTo(old);
		return false;
	}

	if(change->action == REORDER_BUFFER_CHANGE_INSERT)
	{		
		/**
		 * convert all columns to a pair of arrays (columns and values)
		 */
		tuple=&change->newtuple->tuple;
		action='I';
		cmdargs = cmdargselem = palloc( (relation->rd_att->natts * 2 +2) 
										* sizeof(Datum)  );
		cmdnulls = cmdnullselem = palloc( (relation->rd_att->natts *2 + 2) 
										  * sizeof(bool));
		
		

		for(i = 0; i < relation->rd_att->natts; i++)
		{
			const char * column;			
			const char * value;

			if(tupdesc->attrs[i]->attisdropped)
				continue;
			if(tupdesc->attrs[i]->attnum < 0)
				continue;
			column= NameStr(tupdesc->attrs[i]->attname);
			*cmdargselem++=PointerGetDatum(cstring_to_text(column));
			*cmdnullselem++=false;
			
			value = columnAsText(tupdesc,tuple,i);
			if (value == NULL) 
			{
			  *cmdnullselem++=true;
			  cmdargselem++;
			}
			else
			{
				*cmdnullselem++=false;
				*cmdargselem++=PointerGetDatum(cstring_to_text(value));
			}			
			
		}   
		
		
	}
	else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
	{
		/**
		 * convert all columns into two pairs of arrays.
		 * one for key columns, one for non-key columns
		 */
		action='U';
		
		/**
		 * we need to fine the columns that make up the unique key.
		 * in the log trigger these were arguments to the trigger
		 * ie (kvvkk).  
		 * our options are:
		 *
		 * 1. search for the unique indexes on the table and pick one
		 * 2. Lookup the index name from sl_table
		 * 3. Have the _init() method load a list of all replicated tables
		 *    for this remote and the associated unique key names.
		 */
		
	}
	else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
	{
	  Relation indexrel;
	  TupleDesc indexdesc;
		/**
		 * convert the key columns to a pair of arrays.
		 */
	  action='D';
	  tuple=&change->oldtuple->tuple;
	  
	  /**
	   * populate relation->rd_primary with the primary or candidate
	   * index used to WAL values that specify which row is being deleted.
	   */
	  RelationGetIndexList(relation);

	  indexrel = RelationIdGetRelation(relation->rd_primary);
	  indexdesc = RelationGetDescr(indexrel);
	  

	  cmdargs = cmdargselem = palloc( (indexrel->rd_att->natts * 2 + 2)
									  * sizeof (Datum));
	  cmdnulls = cmdnullselem = palloc( (indexrel->rd_att->natts * 2 + 2)
										* sizeof(bool));
	  for(i = 0; i < indexrel->rd_att->natts; i++)
	  {
		  const char * column;
		  const char * value;
		  
		  if(indexdesc->attrs[i]->attisdropped)
			  /** you can't drop a column from an index, something is wrong */
			  continue;
		  if(indexdesc->attrs[i]->attnum < 0)
			  continue;
		  column = NameStr(indexdesc->attrs[i]->attname);
		  *cmdargselem++= PointerGetDatum(cstring_to_text(column));
		  *cmdnullselem++=false;
		  value = columnAsText(indexdesc,tuple,i);
		  *cmdnullselem++=false;
		  *cmdargselem++=PointerGetDatum(cstring_to_text(value));
	  }
	  RelationClose(indexrel);
	}
	else
	{
		/**
		 * what else?
		 */
	}   



	cmddims[0] = cmdargselem - cmdargs;
	cmdlbs[0] = 1;
	outvalues= construct_md_array(cmdargs,cmdnulls,1,cmddims,cmdlbs,
								 TEXTOID,-1,false,'i');
	array_type_tuple = SearchSysCache1(TYPEOID,TEXTARRAYOID);
	array_type_tuple->t_tableOid = InvalidOid;
	getTypeOutputInfo(TEXTARRAYOID,&arraytypeoutput,&arraytypeisvarlena);
	fmgr_info(arraytypeoutput,&flinfo);
	array_text = DatumGetCString(FunctionCall1Coll(&flinfo,InvalidOid,
												   PointerGetDatum(outvalues)));
	
	appendStringInfo(ctx->out,"%d,%d,%d,NULL,%s,%s,%c,%d,%s"
					 ,local_id
					 ,txn->xid
					 ,table_id
					 ,namespace
					 ,table_name
					 ,action
					 ,update_cols
					 ,array_text);
	RelationClose(relation);
	ReleaseSysCache(array_type_tuple);
	
	
	
	MemoryContextSwitchTo(old);
	ctx->write(ctx,txn->lsn,txn->xid);
	elog(NOTICE,"leaving og pg_decode_change:");
	return true;
}


bool pg_decode_clean(LogicalDecodingContext * ctx)
{
	return true;
}

/**
 * converts the value stored in the attribute/column specified
 * to a text string.  If the value is NULL then a NULL is returned.
 */
char * columnAsText(TupleDesc tupdesc, HeapTuple tuple,int idx)
{
	Oid typid,typeoutput;
	bool		typisvarlena;
	Form_pg_type ftype;
	bool isnull;
	HeapTuple typeTuple;
	Datum origval,val;
	char * outputstr=NULL;

	typid = tupdesc->attrs[idx]->atttypid;

	typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
	
	if(!HeapTupleIsValid(typeTuple)) 
		elog(ERROR, "cache lookup failed for type %u", typid);
	ftype = (Form_pg_type) GETSTRUCT(typeTuple);
	
	getTypeOutputInfo(typid,
					  &typeoutput, &typisvarlena);

	ReleaseSysCache(typeTuple);
	
	origval = fastgetattr(tuple, idx + 1, tupdesc, &isnull);
	if(typisvarlena && !isnull) 
		val = PointerGetDatum(PG_DETOAST_DATUM(origval));
	else
		val = origval;
	if (isnull)
		return NULL;
	outputstr = OidOutputFunctionCall(typeoutput, val);
	return outputstr;
}

/**
 * checks to see if the table described by class_form is
 * replicated from this origin/provider to the recevier
 * we are running for.
 */
bool is_replicated(const char * namespace,const char * table)
{
	char * search_key = palloc(strlen(namespace) + strlen(table)+2);
	replicated_table * entry;
	bool found;

	sprintf(search_key,"%s.%s",namespace,table);
    entry=hash_search(replicated_tables,
					  &search_key,HASH_FIND,&found);
	
	return found;
	
}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to