On 13-01-24 11:15 AM, Steve Singer wrote:
On 13-01-24 06:40 AM, Andres Freund wrote:
Fair enough. I am also working on a user of this infrastructure but that
doesn't help you very much. Steve Singer seemed to make some stabs at
writing an output plugin as well. Steve, how far did you get there?
I was able to get something that generated output for INSERT
statements in a format similar to what a modified slony apply trigger
would want. This was with the list of tables to replicate hard-coded
in the plugin. This was with the patchset from the last commitfest.I
had gotten a bit hung up on the UPDATE and DELETE support because
slony allows you to use an arbitrary user specified unique index as
your key. It looks like better support for tables with a unique
non-primary key is in the most recent patch set. I am hoping to have
time this weekend to update my plugin to use parameters passed in on
the init and other updates in the most recent version. If I make some
progress I will post a link to my progress at the end of the weekend.
My big issue is that I have limited time to spend on this.
A few more comments;
In decode.c DecodeDelete
+ if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+ {
+ elog(DEBUG2, "huh, no primary key for a delete on wal_level =
logical?");
+ return;
+ }
+
I think we should be passing delete's with candidate key data logged to
the plugin. If the table isn't a replicated table then ignoring the
delete is fine. If the table is a replicated table but someone has
deleted the unique index from the table then the plugin will receive
INSERT changes on the table but not DELETE changes. If this happens the
plugin would have any way of knowing that it is missing delete changes.
If my plugin gets passed a DELETE change record but with no key data
then my plugin could do any of
1. Start screaming for help (ie log errors)
2. Drop the table from replication
3. Pass the delete (with no key values) onto the replication client and
let it deal with it (see 1 and 2)
Also, 'huh' isn't one of our standard log message phrases :)
How do you plan on dealing with sequences?
I don't see my plugin being called on sequence changes and I don't see
XLOG_SEQ_LOG listed in DecodeRecordIntoReorderBuffer. Is there a reason
why this can't be easily added?
Also what do we want to do about TRUNCATE support. I could always leave
a TRUNCATE trigger in place that logged the truncate to a sl_truncates
and have my replication daemon respond to the insert on a sl_truncates
table by actually truncating the data on the replica.
I've spent some time this weekend updating my prototype plugin that
generates slony 2.2 style COPY output. I have attached my progress here
(also https://github.com/ssinger/slony1-engine/tree/logical_repl). I
have not gotten as far as modifying slon to act as a logical log
receiver, or made a version of the slony apply trigger that would
process these changes. I haven't looked into the details of what is
involved in setting up a subscription with the snapshot exporting.
I couldn't get the options on the START REPLICATION command to parse so
I just hard coded some list building code in the init method. I do plan
on pasing the list of tables to replicate from the replica to the plugin
(because this list comes from the replica). Passing what could be a
few thousand table names as a list of arguments is a bit ugly and I
admit my list processing code is rough. Does this make us want to
reconsider the format of the option_list ?
I guess should provide an opinion on if I think that the patch in this
CF, if committed could be used to act as a source for slony instead of
the log trigger.
The biggest missing piece I mentioned in my email yesterday, that we
aren't logging the old primary key on row UPDATEs. I don't see building
a credible replication system where you don't allow users to update any
column of a row.
The other issues I've raised (DecodeDelete hiding bad deletes,
replication options not parsing for me) look like easy fixes
no wal decoding support for sequences or truncate are things that I
could work around by doing things much like slony does today. The SYNC
can still capture the sequence changes in a table (where the INSERT's
would be logged) and I can have a trigger capture truncates.
I mostly did this review from the point of view of someone trying to use
the feature, I haven't done a line-by-line review of the code.
I suspect Andres can address these issues and get an updated patch out
during this CF. I think a more detailed code review by someone more
familiar with postgres internals will reveal a handful of other issues
that hopefully can be fixed without a lot of effort. If this were the
only patch in the commitfest I would encourage Andres to push to get
these changes done. If the standard for CF4 is that a patch needs to be
basically in a commitable state at the start of the CF, other than minor
issues, then I don't think this patch meets that bar. In a few more
weeks from now, with a handful of more updates and re-reviews it might.
If we give everyone in the CF that much time to get their patches into a
committable state then I think the CF will drag on until April or even
May and we might not see 9.3 released until close to Christmas (4
patches so far have been rejected or returned with feedback, 51 need
reviewer or committer attention) . I'm not sure I have a huge problem
with that but I don't think it is what was agreed to in the developer
meeting last May.
If this patch is going to get bumped to 9.4 I really hope that someone
with good knowledge of the internals (ie a committer) can give this
patch a good review sooner rather than later. If there are issues
Andres has overlooked that are more serious or complicated to fix I
would like to see them raised before the next CF in June.
Steve
BTW, why does all the transaction reordering stuff has to be in core?
It didn't use to, but people argued pretty damned hard that no undecoded
data should ever allowed to leave the postgres cluster. And to be fair
it makes writing an output plugin *way* much easier. Check
http://git.postgresql.org/gitweb/?p=users/andresfreund/postgres.git;a=blob;f=contrib/test_decoding/test_decoding.c;hb=xlog-decoding-rebasing-cf4
If you skip over tuple_to_stringinfo(), which is just pretty generic
scaffolding for converting a whole tuple to a string, writing out the
changes in some format by now is pretty damn simple.
I think we will find that the replication systems won't be the only
users of this feature. I have often seen systems that have a logging
requirement for auditing purposes or to log then reconstruct the
sequence of changes made to a set of tables in order to feed a
downstream application. Triggers and a journaling table are the
traditional way of doing this but it should be pretty easy to write a
plugin to accomplish the same thing that should give better
performance. If the reordering stuff wasn't in core this would be
much harder.
How much of this infrastructure is to support replicating DDL
changes? IOW,
if we drop that requirement, how much code can we slash?
Unfortunately I don't think too much unless we add in other code that
allows us to check whether the current definition of a table is still
the same as it was back when the tuple was logged.
Any other features or requirements that could be dropped? I think
it's clear at this stage that
this patch is not going to be committed as it is. If you can reduce
it to a
fraction of what it is now, that fraction might have a chance.
Otherwise,
it's just going to be pushed to the next commitfest as whole, and we're
going to be having the same doubts and discussions then.
One thing that reduces complexity is to declare the following as
unsupported:
- CREATE TABLE foo(data text);
- DECODE UP TO HERE;
- INSERT INTO foo(data)
VALUES(very-long-to-be-externally-toasted-tuple);
- DROP TABLE foo;
- DECODE UP TO HERE;
but thats just a minor thing.
I think what we can do more realistically than to chop of required parts
of changeset extraction is to start applying some of the preliminary
patches independently:
- the relmapper/relfilenode changes + pg_relation_by_filenode(spc,
relnode) should be independently committable if a bit boring
- allowing walsenders to connect to a database possibly needs an
interface change
but otherwise it should be fine to go in independently. It also has
other potential use-cases, so I think thats fair.
- logging xl_running_xact's more frequently could also be committed
independently and makes sense independently as it allows a standby to
enter HS faster if the master is busy
- Introducing InvalidCommandId should be relatively uncontroversial. The
fact that no invalid value for command ids exists is imo an oversight
- the *Satisfies change could be applied and they are imo ready but
there's no use-case for it without the rest, so I am not sure whether
theres a point
- currently not separately available, but we could add wal_level=logical
independently. There would be no user of it, but it would be partial
work. That includes the relcache support for keeping track of the
primary key which already is available separately.
Greetings,
Andres Freund
/*-------------------------------------------------------------------------
*
*
*
* Copyright (c) 2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
#include "catalog/index.h"
#include "replication/output_plugin.h"
#include "replication/snapbuild.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "fmgr.h"
#include "access/hash.h"
#include "replication/logical.h"
#include "nodes/makefuncs.h"
#include "commands/defrem.h"
PG_MODULE_MAGIC;
void _PG_init(void);
extern void pg_decode_init(LogicalDecodingContext * ctx, bool is_init);
extern bool pg_decode_begin_txn(LogicalDecodingContext * ctx,
ReorderBufferTXN* txn);
extern bool pg_decode_commit_txn(LogicalDecodingContext * ctx,
ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
extern bool pg_decode_change(LogicalDecodingContext * ctx,
ReorderBufferTXN* txn,
Oid tableoid, ReorderBufferChange *change);
extern bool pg_decode_clean(LogicalDecodingContext * ctx);
char * columnAsText(TupleDesc tupdesc, HeapTuple tuple,int idx);
unsigned int local_id=0;
HTAB * replicated_tables=NULL;
typedef struct {
const char * key;
const char * namespace;
const char * table_name;
int set;
} replicated_table;
static uint32
replicated_table_hash(const void *kp, Size ksize)
{
char *key = *((char **) kp);
return hash_any((void *) key, strlen(key));
}
static int
replicated_table_cmp(const void *kp1, const void *kp2, Size ksize)
{
char *key1 = *((char **) kp1);
char *key2 = *((char **) kp2);
return strcmp(key1, key2);
}
bool is_replicated(const char * namespace,const char * table);
void
_PG_init(void)
{
}
void
pg_decode_init(LogicalDecodingContext * ctx, bool is_init)
{
char * table;
bool found;
HASHCTL hctl;
int i ;
List * options = list_make1(makeDefElem("schema_table_1"
,(Node*)makeString("public")));
options = lappend(options,makeDefElem("table_1"
,(Node*)makeString("a")));
options = lappend(options,makeDefElem("schema_table_2"
,(Node*)makeString("public")));
options = lappend(options,makeDefElem("table_2"
,(Node*)makeString("b")));
options = lappend(options,makeDefElem("schema_table_3"
,(Node*)makeString("public")));
options = lappend(options,makeDefElem("table_3"
,(Node*)makeString("c")));
ctx->output_plugin_private = AllocSetContextCreate(TopMemoryContext,
"slony logical context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
AssertVariableIsOfType(&pg_decode_init, LogicalDecodeInitCB);
MemoryContext context = (MemoryContext)ctx->output_plugin_private;
MemoryContext old = MemoryContextSwitchTo(context);
/**
* query the local database to find
* 1. the local_id
*/
elog(NOTICE,"inside of pg_decode_init");
hctl.keysize = sizeof(char*);
hctl.entrysize = sizeof(replicated_table);
hctl.hash = replicated_table_hash;
hctl.match = replicated_table_cmp;
/**
* build a hash table with information on all replicated tables.
*/
replicated_tables=hash_create("replicated_tables",10,&hctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
for(i = 0; i < options->length; i= i + 2 )
{
DefElem * def_schema = (DefElem*) list_nth(options,i);
DefElem * def_table = (DefElem*) list_nth(options,i+1);
const char * schema= defGetString(def_schema);
const char * table_name = defGetString(def_table);
table = palloc(strlen(schema) + strlen(table_name)+2);
sprintf(table,"%s.%s",schema,table_name);
replicated_table * entry=hash_search(replicated_tables,
&table,HASH_ENTER,&found);
entry->key=table;
entry->namespace=pstrdup(schema);
entry->table_name=pstrdup(table_name);
entry->set=1;
}
MemoryContextSwitchTo(old);
}
bool
pg_decode_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN* txn)
{
AssertVariableIsOfType(&pg_decode_begin_txn, LogicalDecodeBeginCB);
/**
* we can ignore the begin and commit. slony operates
* on SYNC boundaries.
*/
elog(NOTICE,"inside of begin");
ctx->prepare_write(ctx,txn->lsn,txn->xid);
appendStringInfo(ctx->out, "BEGIN %d", txn->xid);
ctx->write(ctx,txn->lsn,txn->xid);
return true;
}
bool
pg_decode_commit_txn( LogicalDecodingContext * ctx,
ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
{
AssertVariableIsOfType(&pg_decode_commit_txn, LogicalDecodeCommitCB);
/**
* we can ignore the begin and commit. slony operates
* on SYNC boundaries.
*/
elog(NOTICE,"inside of commit");
ctx->prepare_write(ctx,txn->lsn,txn->xid);
appendStringInfo(ctx->out, "COMMIT %d", txn->xid);
ctx->write(ctx,txn->lsn,txn->xid);
return true;
}
bool
pg_decode_change(LogicalDecodingContext * ctx, ReorderBufferTXN* txn,
Oid tableoid, ReorderBufferChange *change)
{
Relation relation = RelationIdGetRelation(tableoid);
TupleDesc tupdesc = RelationGetDescr(relation);
Form_pg_class class_form = RelationGetForm(relation);
MemoryContext context = (MemoryContext)ctx->output_plugin_private;
MemoryContext old = MemoryContextSwitchTo(context);
int i;
HeapTuple tuple;
/**
* we build up an array of Datum's so we can convert this
* to an array and use array_out to get a text representation.
* it might be more efficient to leave everything as
* cstrings and write our own quoting/escaping code
*/
Datum *cmdargs = NULL;
Datum *cmdargselem = NULL;
bool *cmdnulls = NULL;
bool *cmdnullselem = NULL;
int cmddims[1];
int cmdlbs[1];
ArrayType *outvalues;
const char * array_text;
Oid arraytypeoutput;
bool arraytypeisvarlena;
HeapTuple array_type_tuple;
FmgrInfo flinfo;
char action='?';
int update_cols=0;
int table_id=0;
const char * table_name;
const char * namespace;
ctx->prepare_write(ctx,txn->lsn,txn->xid);
elog(NOTICE,"inside og pg_decode_change");
namespace=get_namespace_name(class_form->relnamespace);
table_name=NameStr(class_form->relname);
if( ! is_replicated(namespace,table_name) )
{
RelationClose(relation);
MemoryContextSwitchTo(old);
return false;
}
if(change->action == REORDER_BUFFER_CHANGE_INSERT)
{
/**
* convert all columns to a pair of arrays (columns and values)
*/
tuple=&change->newtuple->tuple;
action='I';
cmdargs = cmdargselem = palloc( (relation->rd_att->natts * 2 +2)
* sizeof(Datum) );
cmdnulls = cmdnullselem = palloc( (relation->rd_att->natts *2 + 2)
* sizeof(bool));
for(i = 0; i < relation->rd_att->natts; i++)
{
const char * column;
const char * value;
if(tupdesc->attrs[i]->attisdropped)
continue;
if(tupdesc->attrs[i]->attnum < 0)
continue;
column= NameStr(tupdesc->attrs[i]->attname);
*cmdargselem++=PointerGetDatum(cstring_to_text(column));
*cmdnullselem++=false;
value = columnAsText(tupdesc,tuple,i);
if (value == NULL)
{
*cmdnullselem++=true;
cmdargselem++;
}
else
{
*cmdnullselem++=false;
*cmdargselem++=PointerGetDatum(cstring_to_text(value));
}
}
}
else if (change->action == REORDER_BUFFER_CHANGE_UPDATE)
{
/**
* convert all columns into two pairs of arrays.
* one for key columns, one for non-key columns
*/
action='U';
/**
* we need to fine the columns that make up the unique key.
* in the log trigger these were arguments to the trigger
* ie (kvvkk).
* our options are:
*
* 1. search for the unique indexes on the table and pick one
* 2. Lookup the index name from sl_table
* 3. Have the _init() method load a list of all replicated tables
* for this remote and the associated unique key names.
*/
}
else if (change->action == REORDER_BUFFER_CHANGE_DELETE)
{
Relation indexrel;
TupleDesc indexdesc;
/**
* convert the key columns to a pair of arrays.
*/
action='D';
tuple=&change->oldtuple->tuple;
/**
* populate relation->rd_primary with the primary or candidate
* index used to WAL values that specify which row is being deleted.
*/
RelationGetIndexList(relation);
indexrel = RelationIdGetRelation(relation->rd_primary);
indexdesc = RelationGetDescr(indexrel);
cmdargs = cmdargselem = palloc( (indexrel->rd_att->natts * 2 + 2)
* sizeof (Datum));
cmdnulls = cmdnullselem = palloc( (indexrel->rd_att->natts * 2 + 2)
* sizeof(bool));
for(i = 0; i < indexrel->rd_att->natts; i++)
{
const char * column;
const char * value;
if(indexdesc->attrs[i]->attisdropped)
/** you can't drop a column from an index, something is wrong */
continue;
if(indexdesc->attrs[i]->attnum < 0)
continue;
column = NameStr(indexdesc->attrs[i]->attname);
*cmdargselem++= PointerGetDatum(cstring_to_text(column));
*cmdnullselem++=false;
value = columnAsText(indexdesc,tuple,i);
*cmdnullselem++=false;
*cmdargselem++=PointerGetDatum(cstring_to_text(value));
}
RelationClose(indexrel);
}
else
{
/**
* what else?
*/
}
cmddims[0] = cmdargselem - cmdargs;
cmdlbs[0] = 1;
outvalues= construct_md_array(cmdargs,cmdnulls,1,cmddims,cmdlbs,
TEXTOID,-1,false,'i');
array_type_tuple = SearchSysCache1(TYPEOID,TEXTARRAYOID);
array_type_tuple->t_tableOid = InvalidOid;
getTypeOutputInfo(TEXTARRAYOID,&arraytypeoutput,&arraytypeisvarlena);
fmgr_info(arraytypeoutput,&flinfo);
array_text = DatumGetCString(FunctionCall1Coll(&flinfo,InvalidOid,
PointerGetDatum(outvalues)));
appendStringInfo(ctx->out,"%d,%d,%d,NULL,%s,%s,%c,%d,%s"
,local_id
,txn->xid
,table_id
,namespace
,table_name
,action
,update_cols
,array_text);
RelationClose(relation);
ReleaseSysCache(array_type_tuple);
MemoryContextSwitchTo(old);
ctx->write(ctx,txn->lsn,txn->xid);
elog(NOTICE,"leaving og pg_decode_change:");
return true;
}
bool pg_decode_clean(LogicalDecodingContext * ctx)
{
return true;
}
/**
* converts the value stored in the attribute/column specified
* to a text string. If the value is NULL then a NULL is returned.
*/
char * columnAsText(TupleDesc tupdesc, HeapTuple tuple,int idx)
{
Oid typid,typeoutput;
bool typisvarlena;
Form_pg_type ftype;
bool isnull;
HeapTuple typeTuple;
Datum origval,val;
char * outputstr=NULL;
typid = tupdesc->attrs[idx]->atttypid;
typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
if(!HeapTupleIsValid(typeTuple))
elog(ERROR, "cache lookup failed for type %u", typid);
ftype = (Form_pg_type) GETSTRUCT(typeTuple);
getTypeOutputInfo(typid,
&typeoutput, &typisvarlena);
ReleaseSysCache(typeTuple);
origval = fastgetattr(tuple, idx + 1, tupdesc, &isnull);
if(typisvarlena && !isnull)
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
else
val = origval;
if (isnull)
return NULL;
outputstr = OidOutputFunctionCall(typeoutput, val);
return outputstr;
}
/**
* checks to see if the table described by class_form is
* replicated from this origin/provider to the recevier
* we are running for.
*/
bool is_replicated(const char * namespace,const char * table)
{
char * search_key = palloc(strlen(namespace) + strlen(table)+2);
replicated_table * entry;
bool found;
sprintf(search_key,"%s.%s",namespace,table);
entry=hash_search(replicated_tables,
&search_key,HASH_FIND,&found);
return found;
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers