Hi,

I've for a while suspected that the separation & duplication of
infrastructure between walsenders and normal backends isn't nice.  But
looking at the code changes in v10 drove that home quite a bit.

The major changes in this area are
1) d1ecd539477fe640455dc890216a7c1561e047b4 - Add a SHOW command to the 
replication command language.
2) 7c4f52409a8c7d85ed169bbbc1f6092274d03920 - Logical replication support for 
initial data copy

The first adds SHOW support for the replication protocol. With such
oddities that "SHOW hba_file;" works, but "show hba_file;"
doesn't. Unless your're connected to a database, when feature 2) kicks
in and falls back to the normal SQL parser (as it'd for most other SQL
commands).

With 2) we can execute normal SQL over a walsender connection. That's
useful for logical rep because it needs to able to copy data (via
e.g. COPY), read catalogs (SELECT) and stream data (via
START_REPLICATION).

The problem is that we now end up with fairly similar infrastructure,
that's duplicated in walsender and normal backends. We've already seen a
couple bugs stem from this:
- parallel queries hang when executed over walsender connection
- query cancel doesn't work over walsender connection
- parser/scanner hack deciding between replication and normal grammer
  isn't correct
- SnapBuildClearExportedSnapshot isn't called reliably anymore
- config files aren't reloaded while idle in a walsender connection

while all of those are fixable, and several of them already have
proposed fixes, I think this is some indication that the current split
isn't well thought out.  I think in hindsight, the whole idea of
introducing a separate protocol/language for the replication protocol
was a rather bad one.

I think we should give up on the current course, and decide to unify as
much as possible.  Petr already proposed some fixes for the above
(unifying signal and config file handling), but I don't think that
solves the more fundamental issue of different grammars.

For me it's fairly obvious that we should try to merge the two grammars,
and live with the slight uglyness that'll caused by doing so.  The
primary problem here is that both languages "look" different, primary
that the replication protocol uses enforced-all-caps-with-underscores as
style.

Attached is a very rough POC, that unifies the two grammars.
Unsurprisingly that requires the addition of a bunch of additional
keywords, but at least they're all unreserved.  Both grammars seem to
mostly merge well, except for repl_scanner.l's RECPTR which I wasn't
able to add to scan.l within a couple minutes (therefore single quotes
are now necessary).  This really is WIP, and not meant as a patch to be
reviewed in detail, but just as something to be discussed.

I'm probably going to be tarred and feathered for this position, but I
think we need to fix this before v10 is coming out.  We're taking on
quite some architectural burden here, and I'm pretty sure we'd otherwise
have to fix it in v11, so we'll be stuck with some odd-duck v10, that
behaves different from all other versions.

If so, we'd have to:
- gate catalog access more careful in !am_db_walsender connections
- find a better way to deal with repl_scan.l's RECPTR
- remove replnodes.h (or move at least parts of it) into parsenodes.h
- do a lot of other minor cleanup

Greetings,

Andres Freund
>From 63866c2c8f9c968957dfb0d5b8c5ceca2c4787ed Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 24 Apr 2017 22:57:49 -0700
Subject: [PATCH] WIP: POC: Prototype: Unify SQL and replication grammars.

---
 src/backend/commands/event_trigger.c               |   8 +-
 src/backend/parser/analyze.c                       |  10 +
 src/backend/parser/gram.y                          | 396 ++++++++++++++++++--
 src/backend/replication/Makefile                   |   5 +-
 .../libpqwalreceiver/libpqwalreceiver.c            |   2 +-
 src/backend/replication/logical/snapbuild.c        |   5 +-
 src/backend/replication/repl_gram.y                | 408 ---------------------
 src/backend/replication/repl_scanner.l             | 248 -------------
 src/backend/replication/walsender.c                | 124 ++-----
 src/backend/tcop/postgres.c                        |  69 ++--
 src/backend/tcop/pquery.c                          |   2 +
 src/backend/tcop/utility.c                         |  46 +++
 src/bin/pg_basebackup/pg_recvlogical.c             |   2 +-
 src/bin/pg_basebackup/receivelog.c                 |   2 +-
 src/include/nodes/nodes.h                          |   2 +-
 src/include/nodes/replnodes.h                      |   8 +
 src/include/parser/gramparse.h                     |   1 +
 src/include/parser/kwlist.h                        |  18 +
 src/include/replication/walsender.h                |   5 +-
 19 files changed, 538 insertions(+), 823 deletions(-)
 delete mode 100644 src/backend/replication/repl_gram.y
 delete mode 100644 src/backend/replication/repl_scanner.l

diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index d7c199f314..1e441fbe86 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -295,7 +295,13 @@ check_ddl_tag(const char *tag)
 		pg_strcasecmp(tag, "REVOKE") == 0 ||
 		pg_strcasecmp(tag, "DROP OWNED") == 0 ||
 		pg_strcasecmp(tag, "IMPORT FOREIGN SCHEMA") == 0 ||
-		pg_strcasecmp(tag, "SECURITY LABEL") == 0)
+		pg_strcasecmp(tag, "SECURITY LABEL") == 0 ||
+		pg_strcasecmp(tag, "IDENTIFY_SYSTEM") == 0 ||
+		pg_strcasecmp(tag, "BASE_BACKUP") == 0 ||
+		pg_strcasecmp(tag, "CREATE_REPLICATION_SLOT") == 0 ||
+		pg_strcasecmp(tag, "DROP_REPLICATION_SLOT") == 0 ||
+		pg_strcasecmp(tag, "START_REPLICATION") == 0 ||
+		pg_strcasecmp(tag, "TIMELINE_HISTORY") == 0)
 		return EVENT_TRIGGER_COMMAND_TAG_OK;
 
 	/*
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 567dd54c6c..d969c0a4e0 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -42,6 +42,7 @@
 #include "parser/parse_target.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
+#include "replication/walsender.h"
 #include "utils/rel.h"
 
 
@@ -104,6 +105,15 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
 
 	pstate->p_sourcetext = sourceText;
 
+	if (am_walsender && !am_db_walsender &&
+		analyze_requires_snapshot(parseTree))
+	{
+		/* FIXME: message */
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("non-replication statement not supported in pure replication connection")));
+	}
+
 	if (numParams > 0)
 		parse_fixed_parameters(pstate, paramTypes, numParams);
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 89d2836c49..6ede6d6af0 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -48,6 +48,7 @@
 #include <ctype.h>
 #include <limits.h>
 
+#include "access/xlogdefs.h"
 #include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_am.h"
@@ -56,9 +57,12 @@
 #include "commands/trigger.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
+#include "nodes/replnodes.h"
 #include "parser/gramparse.h"
 #include "parser/parser.h"
 #include "parser/parse_expr.h"
+#include "replication/walsender.h"
+#include "replication/walsender_private.h"
 #include "storage/lmgr.h"
 #include "utils/date.h"
 #include "utils/datetime.h"
@@ -241,6 +245,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	PartitionSpec		*partspec;
 	PartitionRangeDatum	*partrange_datum;
 	RoleSpec			*rolespec;
+	XLogRecPtr			recptr;
+	uint32				uintval;
 }
 
 %type <node>	stmt schema_stmt
@@ -584,6 +590,25 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partrange_datum>	PartitionRangeDatum
 %type <list>		range_datum_list
 
+
+/* replication stuff */
+%type <node>	replication_command
+%type <node>	base_backup start_replication start_logical_replication
+				create_replication_slot drop_replication_slot identify_system
+				timeline_history
+%type <list>	base_backup_opt_list
+%type <defelt>	base_backup_opt
+%type <uintval>	opt_timeline
+%type <list>	plugin_options plugin_opt_list
+%type <defelt>	plugin_opt_elem
+%type <node>	plugin_opt_arg
+%type <str>		opt_slot
+%type <boolean>	opt_temporary
+%type <list>	create_slot_opt_list
+%type <defelt>	create_slot_opt
+%type <recptr>	recptr
+
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -610,7 +635,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	AGGREGATE ALL ALSO ALTER ALWAYS ANALYSE ANALYZE AND ANY ARRAY AS ASC
 	ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION
 
-	BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
+	BACKWARD BASE_BACKUP BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT
 	BOOLEAN_P BOTH BY
 
 	CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P
@@ -618,29 +643,29 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT
 	COMMITTED CONCURRENTLY CONFIGURATION CONFLICT CONNECTION CONSTRAINT
 	CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE
-	CROSS CSV CUBE CURRENT_P
+	CREATE_REPLICATION_SLOT CROSS CSV CUBE CURRENT_P
 	CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA
 	CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
 
 	DATA_P DATABASE DAY_P DEALLOCATE DEC DECIMAL_P DECLARE DEFAULT DEFAULTS
 	DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DEPENDS DESC
 	DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P
-	DOUBLE_P DROP
+	DOUBLE_P DROP DROP_REPLICATION_SLOT
 
 	EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT
-	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
+	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPORT_SNAPSHOT
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FAST_P FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING
 
 	HANDLER HAVING HEADER_P HOLD HOUR_P
 
-	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P
-	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
-	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
+	IDENTITY_P IDENTIFY_SYSTEM IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P
+	IMPORT_P IN_P INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY
+	INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
 	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
 
 	JOIN
@@ -649,45 +674,46 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LOGICAL_P
 
-	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
+	MAPPING MATCH MATERIALIZED MAXVALUE MAX_RATE METHOD MINUTE_P MINVALUE MODE
+	MONTH_P MOVE
 
-	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE
+	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NOEXPORT_SNAPSHOT NONE
 	NOREFRESH NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF
 	NULLS_P NUMERIC
 
 	OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OPTIONS OR
 	ORDER ORDINALITY OUT_P OUTER_P OVER OVERLAPS OVERLAY OVERRIDING OWNED OWNER
 
-	PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY
-	POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
-	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PUBLICATION
+	PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PHYSICAL PLACING PLANS
+	POLICY POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
+	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PROGRESS PUBLICATION
 
 	QUOTE
 
 	RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REFERENCING
 	REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
-	RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
-	ROW ROWS RULE
+	RESERVE_WAL RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE
+	ROLLBACK ROLLUP ROW ROWS RULE
 
 	SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
 	SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW
 	SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P
-	START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P
-	SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
+	START START_REPLICATION STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P
+	STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P
 
-	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P
-	TRUNCATE TRUSTED TYPE_P TYPES_P
+	TABLE TABLES TABLESAMPLE TABLESPACE TABLESPACE_MAP_P TEMP TEMPLATE TEMPORARY
+	TEXT_P THEN TIME TIMELINE_P TIMELINE_HISTORY TIMESTAMP TO TRAILING TRANSACTION
+	TRANSFORM TREAT TRIGGER TRIM TRUE_P	TRUNCATE TRUSTED TYPE_P TYPES_P
 
 	UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED
-	UNTIL UPDATE USER USING
+	UNTIL UPDATE USE_SNAPSHOT USER USING
 
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAL_P WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -942,6 +968,7 @@ stmt :
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| replication_command
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -14487,6 +14514,311 @@ AexprConst: Iconst
 				}
 		;
 
+/* replication protocol stuff */
+
+replication_command:
+			identify_system
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| base_backup
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| start_replication
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| start_logical_replication
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| create_replication_slot
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| drop_replication_slot
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+			| timeline_history
+			{
+				ReplicationCmd *rcmd = makeNode(ReplicationCmd);
+				rcmd->replicationCmd = $1;
+				$$ = (Node *) rcmd;
+			}
+;
+
+
+/*
+ * IDENTIFY_SYSTEM
+ */
+identify_system:
+			IDENTIFY_SYSTEM
+				{
+					$$ = (Node *) makeNode(IdentifySystemCmd);
+				}
+			;
+
+/*
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * [MAX_RATE %d] [TABLESPACE_MAP]
+ */
+base_backup:
+			BASE_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
+base_backup_opt_list:
+			base_backup_opt_list base_backup_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+base_backup_opt:
+			LABEL SCONST
+				{
+				  $$ = makeDefElem("label",
+								   (Node *)makeString($2), -1);
+				}
+			| PROGRESS
+				{
+				  $$ = makeDefElem("progress",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| FAST_P
+				{
+				  $$ = makeDefElem("fast",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| WAL_P
+				{
+				  $$ = makeDefElem("wal",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| NOWAIT
+				{
+				  $$ = makeDefElem("nowait",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| MAX_RATE ICONST
+				{
+				  $$ = makeDefElem("max_rate",
+								   (Node *)makeInteger($2), -1);
+				}
+			| TABLESPACE_MAP_P
+				{
+				  $$ = makeDefElem("tablespace_map",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			;
+
+create_replication_slot:
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
+			CREATE_REPLICATION_SLOT IDENT opt_temporary PHYSICAL create_slot_opt_list
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_PHYSICAL;
+					cmd->slotname = $2;
+					cmd->temporary = $3;
+					cmd->options = $5;
+					$$ = (Node *) cmd;
+				}
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
+			| CREATE_REPLICATION_SLOT IDENT opt_temporary LOGICAL_P IDENT create_slot_opt_list
+				{
+					CreateReplicationSlotCmd *cmd;
+					cmd = makeNode(CreateReplicationSlotCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $2;
+					cmd->temporary = $3;
+					cmd->plugin = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
+				}
+			;
+
+create_slot_opt_list:
+			create_slot_opt_list create_slot_opt
+				{ $$ = lappend($1, $2); }
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+create_slot_opt:
+			EXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| NOEXPORT_SNAPSHOT
+				{
+				  $$ = makeDefElem("export_snapshot",
+								   (Node *)makeInteger(FALSE), -1);
+				}
+			| USE_SNAPSHOT
+				{
+				  $$ = makeDefElem("use_snapshot",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			| RESERVE_WAL
+				{
+				  $$ = makeDefElem("reserve_wal",
+								   (Node *)makeInteger(TRUE), -1);
+				}
+			;
+
+/* DROP_REPLICATION_SLOT slot */
+drop_replication_slot:
+			DROP_REPLICATION_SLOT IDENT
+				{
+					DropReplicationSlotCmd *cmd;
+					cmd = makeNode(DropReplicationSlotCmd);
+					cmd->slotname = $2;
+					$$ = (Node *) cmd;
+				}
+			;
+
+recptr:
+			SCONST
+			   {
+					uint32	hi,
+							lo;
+					if (sscanf($1, "%X/%X", &hi, &lo) != 2)
+							ereport(ERROR,
+									(errcode(ERRCODE_SYNTAX_ERROR),
+									 errmsg("invalid frakbar"),
+									 parser_errposition(@1)));
+					$$ = ((uint64) hi) << 32 | lo;
+				}
+
+/*
+ * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
+ */
+start_replication:
+			START_REPLICATION opt_slot opt_physical recptr opt_timeline
+				{
+					StartReplicationCmd *cmd;
+
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_PHYSICAL;
+					cmd->slotname = $2;
+					cmd->startpoint = $4;
+					cmd->timeline = $5;
+					$$ = (Node *) cmd;
+				}
+			;
+
+/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
+start_logical_replication:
+			START_REPLICATION SLOT IDENT LOGICAL_P recptr plugin_options
+				{
+					StartReplicationCmd *cmd;
+					cmd = makeNode(StartReplicationCmd);
+					cmd->kind = REPLICATION_KIND_LOGICAL;
+					cmd->slotname = $3;
+					cmd->startpoint = $5;
+					cmd->options = $6;
+					$$ = (Node *) cmd;
+				}
+			;
+/*
+ * TIMELINE_HISTORY %d
+ */
+timeline_history:
+			TIMELINE_HISTORY ICONST
+				{
+					TimeLineHistoryCmd *cmd;
+
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+
+					cmd = makeNode(TimeLineHistoryCmd);
+					cmd->timeline = $2;
+
+					$$ = (Node *) cmd;
+				}
+			;
+
+opt_physical:
+			PHYSICAL
+			| /* EMPTY */
+			;
+
+opt_temporary:
+			TEMPORARY						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
+opt_slot:
+			SLOT IDENT
+				{ $$ = $2; }
+			| /* EMPTY */
+				{ $$ = NULL; }
+			;
+
+opt_timeline:
+			TIMELINE_P ICONST
+				{
+					if ($2 <= 0)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 (errmsg("invalid timeline %u", $2))));
+					$$ = $2;
+				}
+				| /* EMPTY */			{ $$ = 0; }
+			;
+
+
+plugin_options:
+			'(' plugin_opt_list ')'			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NIL; }
+		;
+
+plugin_opt_list:
+			plugin_opt_elem
+				{
+					$$ = list_make1($1);
+				}
+			| plugin_opt_list ',' plugin_opt_elem
+				{
+					$$ = lappend($1, $3);
+				}
+		;
+
+plugin_opt_elem:
+			IDENT plugin_opt_arg
+				{
+					$$ = makeDefElem($1, $2, -1);
+				}
+		;
+
+plugin_opt_arg:
+			SCONST							{ $$ = (Node *) makeString($1); }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
+
+
 Iconst:		ICONST									{ $$ = $1; };
 Sconst:		SCONST									{ $$ = $1; };
 
@@ -14646,6 +14978,7 @@ unreserved_keyword:
 			| ATTACH
 			| ATTRIBUTE
 			| BACKWARD
+			| BASE_BACKUP
 			| BEFORE
 			| BEGIN_P
 			| BY
@@ -14674,6 +15007,7 @@ unreserved_keyword:
 			| CONVERSION_P
 			| COPY
 			| COST
+			| CREATE_REPLICATION_SLOT
 			| CSV
 			| CUBE
 			| CURRENT_P
@@ -14699,6 +15033,7 @@ unreserved_keyword:
 			| DOMAIN_P
 			| DOUBLE_P
 			| DROP
+			| DROP_REPLICATION_SLOT
 			| EACH
 			| ENABLE_P
 			| ENCODING
@@ -14711,9 +15046,11 @@ unreserved_keyword:
 			| EXCLUSIVE
 			| EXECUTE
 			| EXPLAIN
+			| EXPORT_SNAPSHOT
 			| EXTENSION
 			| EXTERNAL
 			| FAMILY
+			| FAST_P
 			| FILTER
 			| FIRST_P
 			| FOLLOWING
@@ -14728,6 +15065,7 @@ unreserved_keyword:
 			| HEADER_P
 			| HOLD
 			| HOUR_P
+			| IDENTIFY_SYSTEM
 			| IDENTITY_P
 			| IF_P
 			| IMMEDIATE
@@ -14761,10 +15099,12 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LOGICAL_P
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
 			| MAXVALUE
+			| MAX_RATE
 			| METHOD
 			| MINUTE_P
 			| MINVALUE
@@ -14776,6 +15116,7 @@ unreserved_keyword:
 			| NEW
 			| NEXT
 			| NO
+			| NOEXPORT_SNAPSHOT
 			| NOREFRESH
 			| NOTHING
 			| NOTIFY
@@ -14800,6 +15141,7 @@ unreserved_keyword:
 			| PARTITION
 			| PASSING
 			| PASSWORD
+			| PHYSICAL
 			| PLANS
 			| POLICY
 			| PRECEDING
@@ -14811,6 +15153,7 @@ unreserved_keyword:
 			| PROCEDURAL
 			| PROCEDURE
 			| PROGRAM
+			| PROGRESS
 			| PUBLICATION
 			| QUOTE
 			| RANGE
@@ -14828,6 +15171,7 @@ unreserved_keyword:
 			| REPEATABLE
 			| REPLACE
 			| REPLICA
+			| RESERVE_WAL
 			| RESET
 			| RESTART
 			| RESTRICT
@@ -14862,6 +15206,7 @@ unreserved_keyword:
 			| STABLE
 			| STANDALONE_P
 			| START
+			| START_REPLICATION
 			| STATEMENT
 			| STATISTICS
 			| STDIN
@@ -14874,10 +15219,13 @@ unreserved_keyword:
 			| SYSTEM_P
 			| TABLES
 			| TABLESPACE
+			| TABLESPACE_MAP_P
 			| TEMP
 			| TEMPLATE
 			| TEMPORARY
 			| TEXT_P
+			| TIMELINE_P
+			| TIMELINE_HISTORY
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -14893,6 +15241,7 @@ unreserved_keyword:
 			| UNLOGGED
 			| UNTIL
 			| UPDATE
+			| USE_SNAPSHOT
 			| VACUUM
 			| VALID
 			| VALIDATE
@@ -14903,6 +15252,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAL_P
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index da8bcf0471..62f8f3a440 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,15 +15,12 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
 
 OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
-	repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o
+	slot.o slotfuncs.o syncrep.o syncrep_gram.o
 
 SUBDIRS = logical
 
 include $(top_srcdir)/src/backend/common.mk
 
-# repl_scanner is compiled as part of repl_gram
-repl_gram.o: repl_scanner.c
-
 # syncrep_scanner is complied as part of syncrep_gram
 syncrep_gram.o: syncrep_scanner.c
 syncrep_scanner.c: FLEXFLAGS = -CF -p -i
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 9d7bb25d39..26a53333f8 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -352,7 +352,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 	if (options->logical)
 		appendStringInfo(&cmd, " LOGICAL");
 
-	appendStringInfo(&cmd, " %X/%X",
+	appendStringInfo(&cmd, " '%X/%X'",
 					 (uint32) (options->startpoint >> 32),
 					 (uint32) options->startpoint);
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 358ec28932..eb261ea02f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -589,8 +589,7 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 	Snapshot	snap;
 	char	   *snapname;
 
-	if (IsTransactionOrTransactionBlock())
-		elog(ERROR, "cannot export a snapshot from within a transaction");
+	Assert(IsTransactionOrTransactionBlock());
 
 	if (SavedResourceOwnerDuringExport)
 		elog(ERROR, "can only export one snapshot at a time");
@@ -598,8 +597,6 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 	SavedResourceOwnerDuringExport = CurrentResourceOwner;
 	ExportInProgress = true;
 
-	StartTransactionCommand();
-
 	/* There doesn't seem to a nice API to set these */
 	XactIsoLevel = XACT_REPEATABLE_READ;
 	XactReadOnly = true;
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
deleted file mode 100644
index ec047c827c..0000000000
--- a/src/backend/replication/repl_gram.y
+++ /dev/null
@@ -1,408 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_gram.y				- Parser for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- *	  src/backend/replication/repl_gram.y
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "access/xlogdefs.h"
-#include "nodes/makefuncs.h"
-#include "nodes/replnodes.h"
-#include "replication/walsender.h"
-#include "replication/walsender_private.h"
-
-
-/* Result of the parsing is returned here */
-Node *replication_parse_result;
-
-static SQLCmd *make_sqlcmd(void);
-
-
-/*
- * Bison doesn't allocate anything that needs to live across parser calls,
- * so we can easily have it use palloc instead of malloc.  This prevents
- * memory leaks if we error out during parsing.  Note this only works with
- * bison >= 2.0.  However, in bison 1.875 the default is to use alloca()
- * if possible, so there's not really much problem anyhow, at least if
- * you're building with gcc.
- */
-#define YYMALLOC palloc
-#define YYFREE   pfree
-
-%}
-
-%expect 0
-%name-prefix="replication_yy"
-
-%union {
-		char					*str;
-		bool					boolval;
-		uint32					uintval;
-
-		XLogRecPtr				recptr;
-		Node					*node;
-		List					*list;
-		DefElem					*defelt;
-}
-
-/* Non-keyword tokens */
-%token <str> SCONST IDENT
-%token <uintval> UCONST
-%token <recptr> RECPTR
-%token T_WORD
-
-/* Keyword tokens. */
-%token K_BASE_BACKUP
-%token K_IDENTIFY_SYSTEM
-%token K_SHOW
-%token K_START_REPLICATION
-%token K_CREATE_REPLICATION_SLOT
-%token K_DROP_REPLICATION_SLOT
-%token K_TIMELINE_HISTORY
-%token K_LABEL
-%token K_PROGRESS
-%token K_FAST
-%token K_NOWAIT
-%token K_MAX_RATE
-%token K_WAL
-%token K_TABLESPACE_MAP
-%token K_TIMELINE
-%token K_PHYSICAL
-%token K_LOGICAL
-%token K_SLOT
-%token K_RESERVE_WAL
-%token K_TEMPORARY
-%token K_EXPORT_SNAPSHOT
-%token K_NOEXPORT_SNAPSHOT
-%token K_USE_SNAPSHOT
-
-%type <node>	command
-%type <node>	base_backup start_replication start_logical_replication
-				create_replication_slot drop_replication_slot identify_system
-				timeline_history show sql_cmd
-%type <list>	base_backup_opt_list
-%type <defelt>	base_backup_opt
-%type <uintval>	opt_timeline
-%type <list>	plugin_options plugin_opt_list
-%type <defelt>	plugin_opt_elem
-%type <node>	plugin_opt_arg
-%type <str>		opt_slot var_name
-%type <boolval>	opt_temporary
-%type <list>	create_slot_opt_list
-%type <defelt>	create_slot_opt
-
-%%
-
-firstcmd: command opt_semicolon
-				{
-					replication_parse_result = $1;
-				}
-			;
-
-opt_semicolon:	';'
-				| /* EMPTY */
-				;
-
-command:
-			identify_system
-			| base_backup
-			| start_replication
-			| start_logical_replication
-			| create_replication_slot
-			| drop_replication_slot
-			| timeline_history
-			| show
-			| sql_cmd
-			;
-
-/*
- * IDENTIFY_SYSTEM
- */
-identify_system:
-			K_IDENTIFY_SYSTEM
-				{
-					$$ = (Node *) makeNode(IdentifySystemCmd);
-				}
-			;
-
-/*
- * SHOW setting
- */
-show:
-			K_SHOW var_name
-				{
-					VariableShowStmt *n = makeNode(VariableShowStmt);
-					n->name = $2;
-					$$ = (Node *) n;
-				}
-
-var_name:	IDENT	{ $$ = $1; }
-			| var_name '.' IDENT
-				{ $$ = psprintf("%s.%s", $1, $3); }
-		;
-
-/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
- * [MAX_RATE %d] [TABLESPACE_MAP]
- */
-base_backup:
-			K_BASE_BACKUP base_backup_opt_list
-				{
-					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
-					cmd->options = $2;
-					$$ = (Node *) cmd;
-				}
-			;
-
-base_backup_opt_list:
-			base_backup_opt_list base_backup_opt
-				{ $$ = lappend($1, $2); }
-			| /* EMPTY */
-				{ $$ = NIL; }
-			;
-
-base_backup_opt:
-			K_LABEL SCONST
-				{
-				  $$ = makeDefElem("label",
-								   (Node *)makeString($2), -1);
-				}
-			| K_PROGRESS
-				{
-				  $$ = makeDefElem("progress",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_FAST
-				{
-				  $$ = makeDefElem("fast",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_WAL
-				{
-				  $$ = makeDefElem("wal",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_NOWAIT
-				{
-				  $$ = makeDefElem("nowait",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_MAX_RATE UCONST
-				{
-				  $$ = makeDefElem("max_rate",
-								   (Node *)makeInteger($2), -1);
-				}
-			| K_TABLESPACE_MAP
-				{
-				  $$ = makeDefElem("tablespace_map",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			;
-
-create_replication_slot:
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */
-			K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list
-				{
-					CreateReplicationSlotCmd *cmd;
-					cmd = makeNode(CreateReplicationSlotCmd);
-					cmd->kind = REPLICATION_KIND_PHYSICAL;
-					cmd->slotname = $2;
-					cmd->temporary = $3;
-					cmd->options = $5;
-					$$ = (Node *) cmd;
-				}
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
-				{
-					CreateReplicationSlotCmd *cmd;
-					cmd = makeNode(CreateReplicationSlotCmd);
-					cmd->kind = REPLICATION_KIND_LOGICAL;
-					cmd->slotname = $2;
-					cmd->temporary = $3;
-					cmd->plugin = $5;
-					cmd->options = $6;
-					$$ = (Node *) cmd;
-				}
-			;
-
-create_slot_opt_list:
-			create_slot_opt_list create_slot_opt
-				{ $$ = lappend($1, $2); }
-			| /* EMPTY */
-				{ $$ = NIL; }
-			;
-
-create_slot_opt:
-			K_EXPORT_SNAPSHOT
-				{
-				  $$ = makeDefElem("export_snapshot",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_NOEXPORT_SNAPSHOT
-				{
-				  $$ = makeDefElem("export_snapshot",
-								   (Node *)makeInteger(FALSE), -1);
-				}
-			| K_USE_SNAPSHOT
-				{
-				  $$ = makeDefElem("use_snapshot",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			| K_RESERVE_WAL
-				{
-				  $$ = makeDefElem("reserve_wal",
-								   (Node *)makeInteger(TRUE), -1);
-				}
-			;
-
-/* DROP_REPLICATION_SLOT slot */
-drop_replication_slot:
-			K_DROP_REPLICATION_SLOT IDENT
-				{
-					DropReplicationSlotCmd *cmd;
-					cmd = makeNode(DropReplicationSlotCmd);
-					cmd->slotname = $2;
-					$$ = (Node *) cmd;
-				}
-			;
-
-/*
- * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
- */
-start_replication:
-			K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
-				{
-					StartReplicationCmd *cmd;
-
-					cmd = makeNode(StartReplicationCmd);
-					cmd->kind = REPLICATION_KIND_PHYSICAL;
-					cmd->slotname = $2;
-					cmd->startpoint = $4;
-					cmd->timeline = $5;
-					$$ = (Node *) cmd;
-				}
-			;
-
-/* START_REPLICATION SLOT slot LOGICAL %X/%X options */
-start_logical_replication:
-			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
-				{
-					StartReplicationCmd *cmd;
-					cmd = makeNode(StartReplicationCmd);
-					cmd->kind = REPLICATION_KIND_LOGICAL;
-					cmd->slotname = $3;
-					cmd->startpoint = $5;
-					cmd->options = $6;
-					$$ = (Node *) cmd;
-				}
-			;
-/*
- * TIMELINE_HISTORY %d
- */
-timeline_history:
-			K_TIMELINE_HISTORY UCONST
-				{
-					TimeLineHistoryCmd *cmd;
-
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-
-					cmd = makeNode(TimeLineHistoryCmd);
-					cmd->timeline = $2;
-
-					$$ = (Node *) cmd;
-				}
-			;
-
-opt_physical:
-			K_PHYSICAL
-			| /* EMPTY */
-			;
-
-opt_temporary:
-			K_TEMPORARY						{ $$ = true; }
-			| /* EMPTY */					{ $$ = false; }
-			;
-
-opt_slot:
-			K_SLOT IDENT
-				{ $$ = $2; }
-			| /* EMPTY */
-				{ $$ = NULL; }
-			;
-
-opt_timeline:
-			K_TIMELINE UCONST
-				{
-					if ($2 <= 0)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 (errmsg("invalid timeline %u", $2))));
-					$$ = $2;
-				}
-				| /* EMPTY */			{ $$ = 0; }
-			;
-
-
-plugin_options:
-			'(' plugin_opt_list ')'			{ $$ = $2; }
-			| /* EMPTY */					{ $$ = NIL; }
-		;
-
-plugin_opt_list:
-			plugin_opt_elem
-				{
-					$$ = list_make1($1);
-				}
-			| plugin_opt_list ',' plugin_opt_elem
-				{
-					$$ = lappend($1, $3);
-				}
-		;
-
-plugin_opt_elem:
-			IDENT plugin_opt_arg
-				{
-					$$ = makeDefElem($1, $2, -1);
-				}
-		;
-
-plugin_opt_arg:
-			SCONST							{ $$ = (Node *) makeString($1); }
-			| /* EMPTY */					{ $$ = NULL; }
-		;
-
-sql_cmd:
-			IDENT							{ $$ = (Node *) make_sqlcmd(); }
-		;
-%%
-
-static SQLCmd *
-make_sqlcmd(void)
-{
-	SQLCmd *cmd = makeNode(SQLCmd);
-	int tok;
-
-	/* Just move lexer to the end of command. */
-	for (;;)
-	{
-		tok = yylex();
-		if (tok == ';' || tok == 0)
-			break;
-	}
-	return cmd;
-}
-
-#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
deleted file mode 100644
index 52ae7b343f..0000000000
--- a/src/backend/replication/repl_scanner.l
+++ /dev/null
@@ -1,248 +0,0 @@
-%{
-/*-------------------------------------------------------------------------
- *
- * repl_scanner.l
- *	  a lexical scanner for the replication commands
- *
- * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- *
- * IDENTIFICATION
- *	  src/backend/replication/repl_scanner.l
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "utils/builtins.h"
-#include "parser/scansup.h"
-
-/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
-#undef fprintf
-#define fprintf(file, fmt, msg)  fprintf_to_ereport(fmt, msg)
-
-static void
-fprintf_to_ereport(const char *fmt, const char *msg)
-{
-	ereport(ERROR, (errmsg_internal("%s", msg)));
-}
-
-/* Handle to the buffer that the lexer uses internally */
-static YY_BUFFER_STATE scanbufhandle;
-
-static StringInfoData litbuf;
-
-static void startlit(void);
-static char *litbufdup(void);
-static void addlit(char *ytext, int yleng);
-static void addlitchar(unsigned char ychar);
-
-%}
-
-%option 8bit
-%option never-interactive
-%option nodefault
-%option noinput
-%option nounput
-%option noyywrap
-%option warn
-%option prefix="replication_yy"
-
-%x xq xd
-
-/* Extended quote
- * xqdouble implements embedded quote, ''''
- */
-xqstart			{quote}
-xqdouble		{quote}{quote}
-xqinside		[^']+
-
-/* Double quote
- * Allows embedded spaces and other special characters into identifiers.
- */
-dquote			\"
-xdstart			{dquote}
-xdstop			{dquote}
-xddouble		{dquote}{dquote}
-xdinside		[^"]+
-
-digit			[0-9]+
-hexdigit		[0-9A-Za-z]+
-
-quote			'
-quotestop		{quote}
-
-ident_start		[A-Za-z\200-\377_]
-ident_cont		[A-Za-z\200-\377_0-9\$]
-
-identifier		{ident_start}{ident_cont}*
-
-%%
-
-BASE_BACKUP			{ return K_BASE_BACKUP; }
-FAST			{ return K_FAST; }
-IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
-SHOW		{ return K_SHOW; }
-LABEL			{ return K_LABEL; }
-NOWAIT			{ return K_NOWAIT; }
-PROGRESS			{ return K_PROGRESS; }
-MAX_RATE		{ return K_MAX_RATE; }
-WAL			{ return K_WAL; }
-TABLESPACE_MAP			{ return K_TABLESPACE_MAP; }
-TIMELINE			{ return K_TIMELINE; }
-START_REPLICATION	{ return K_START_REPLICATION; }
-CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
-DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
-TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
-PHYSICAL			{ return K_PHYSICAL; }
-RESERVE_WAL			{ return K_RESERVE_WAL; }
-LOGICAL				{ return K_LOGICAL; }
-SLOT				{ return K_SLOT; }
-TEMPORARY			{ return K_TEMPORARY; }
-EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
-NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
-USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
-
-","				{ return ','; }
-";"				{ return ';'; }
-"("				{ return '('; }
-")"				{ return ')'; }
-
-[\n]			;
-[\t]			;
-" "				;
-
-{digit}+		{
-					yylval.uintval = strtoul(yytext, NULL, 10);
-					return UCONST;
-				}
-
-{hexdigit}+\/{hexdigit}+		{
-					uint32	hi,
-							lo;
-					if (sscanf(yytext, "%X/%X", &hi, &lo) != 2)
-						yyerror("invalid streaming start location");
-					yylval.recptr = ((uint64) hi) << 32 | lo;
-					return RECPTR;
-				}
-
-{xqstart}		{
-					BEGIN(xq);
-					startlit();
-				}
-
-<xq>{quotestop}	{
-					yyless(1);
-					BEGIN(INITIAL);
-					yylval.str = litbufdup();
-					return SCONST;
-				}
-
-<xq>{xqdouble}	{
-					addlitchar('\'');
-				}
-
-<xq>{xqinside}  {
-					addlit(yytext, yyleng);
-				}
-
-{xdstart}		{
-					BEGIN(xd);
-					startlit();
-				}
-
-<xd>{xdstop}	{
-					int len;
-					yyless(1);
-					BEGIN(INITIAL);
-					yylval.str = litbufdup();
-					len = strlen(yylval.str);
-					truncate_identifier(yylval.str, len, true);
-					return IDENT;
-				}
-
-<xd>{xdinside}  {
-					addlit(yytext, yyleng);
-				}
-
-{identifier}	{
-					int len = strlen(yytext);
-
-					yylval.str = downcase_truncate_identifier(yytext, len, true);
-					return IDENT;
-				}
-
-<xq,xd><<EOF>>	{ yyerror("unterminated quoted string"); }
-
-
-<<EOF>>			{
-					yyterminate();
-				}
-
-.				{
-					return T_WORD;
-				}
-%%
-
-
-static void
-startlit(void)
-{
-	initStringInfo(&litbuf);
-}
-
-static char *
-litbufdup(void)
-{
-	return litbuf.data;
-}
-
-static void
-addlit(char *ytext, int yleng)
-{
-	appendBinaryStringInfo(&litbuf, ytext, yleng);
-}
-
-static void
-addlitchar(unsigned char ychar)
-{
-	appendStringInfoChar(&litbuf, ychar);
-}
-
-void
-yyerror(const char *message)
-{
-	ereport(ERROR,
-			(errcode(ERRCODE_SYNTAX_ERROR),
-			 errmsg_internal("%s", message)));
-}
-
-
-void
-replication_scanner_init(const char *str)
-{
-	Size		slen = strlen(str);
-	char	   *scanbuf;
-
-	/*
-	 * Might be left over after ereport()
-	 */
-	if (YY_CURRENT_BUFFER)
-		yy_delete_buffer(YY_CURRENT_BUFFER);
-
-	/*
-	 * Make a scan buffer with special termination needed by flex.
-	 */
-	scanbuf = (char *) palloc(slen + 2);
-	memcpy(scanbuf, str, slen);
-	scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR;
-	scanbufhandle = yy_scan_buffer(scanbuf, slen + 2);
-}
-
-void
-replication_scanner_finish(void)
-{
-	yy_delete_buffer(scanbufhandle);
-	scanbufhandle = NULL;
-}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 064cf5ee28..e3284363e0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -360,18 +360,7 @@ IdentifySystem(void)
 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
 	if (MyDatabaseId != InvalidOid)
-	{
-		MemoryContext cur = CurrentMemoryContext;
-
-		/* syscache access needs a transaction env. */
-		StartTransactionCommand();
-		/* make dbname live outside TX context */
-		MemoryContextSwitchTo(cur);
 		dbname = get_database_name(MyDatabaseId);
-		CommitTransactionCommand();
-		/* CommitTransactionCommand switches to TopMemoryContext */
-		MemoryContextSwitchTo(cur);
-	}
 
 	dest = CreateDestReceiver(DestRemoteSimple);
 	MemSet(nulls, false, sizeof(nulls));
@@ -870,6 +859,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
 	}
 
+
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
 	{
 		LogicalDecodingContext *ctx;
@@ -908,6 +898,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 								"must not be called in a subtransaction")));
 		}
 
+		/* XXX: document */
+		CommitTransactionCommand();
+
 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
 										logical_read_xlog_page,
 										WalSndPrepareWrite, WalSndWriteData);
@@ -924,6 +917,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		/* build initial snapshot, might take a while */
 		DecodingContextFindStartpoint(ctx);
 
+		/* match*/
+		StartTransactionCommand();
+
 		/*
 		 * Export or use the snapshot if we've been asked to do so.
 		 *
@@ -940,8 +936,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 			snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
 			RestoreTransactionSnapshot(snap, MyProc);
+
 		}
 
+
 		/* don't need the decoding context anymore */
 		FreeDecodingContext(ctx);
 
@@ -1354,65 +1352,37 @@ WalSndWaitForWal(XLogRecPtr loc)
 	return RecentFlushPtr;
 }
 
-/*
- * Execute an incoming replication command.
- *
- * Returns true if the cmd_string was recognized as WalSender command, false
- * if not.
- */
 bool
-exec_replication_command(const char *cmd_string)
+exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool is_toplevel)
 {
-	int			parse_rc;
-	Node	   *cmd_node;
-	MemoryContext cmd_context;
-	MemoryContext old_context;
-
-	/*
-	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
-	 * command arrives. Clean up the old stuff if there's anything.
-	 */
-	SnapBuildClearExportedSnapshot();
-
-	CHECK_FOR_INTERRUPTS();
-
-	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
-										"Replication command context",
-										ALLOCSET_DEFAULT_SIZES);
-	old_context = MemoryContextSwitchTo(cmd_context);
-
-	replication_scanner_init(cmd_string);
-	parse_rc = replication_yyparse();
-	if (parse_rc != 0)
+	if (!am_walsender)
+	{
 		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 (errmsg_internal("replication command parser returned %d",
-								  parse_rc))));
-
-	cmd_node = replication_parse_result;
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("replication protocol statement not supported in a normal connection")));
+	}
 
 	/*
 	 * Log replication command if log_replication_commands is enabled. Even
 	 * when it's disabled, log the command with DEBUG1 level for backward
-	 * compatibility. Note that SQL commands are not logged here, and will be
-	 * logged later if log_statement is enabled.
+	 * compatibility.
 	 */
-	if (cmd_node->type != T_SQLCmd)
-		ereport(log_replication_commands ? LOG : DEBUG1,
-				(errmsg("received replication command: %s", cmd_string)));
+	ereport(log_replication_commands ? LOG : DEBUG1,
+			(errmsg("received replication command: %s", query_string)));
 
 	/*
 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
 	 * called outside of transaction the snapshot should be cleared here.
+	 *
+	 * FIXME: this is completely borked.
 	 */
 	if (!IsTransactionBlock())
 		SnapBuildClearExportedSnapshot();
 
 	/*
-	 * For aborted transactions, don't allow anything except pure SQL,
-	 * the exec_simple_query() will handle it correctly.
+	 * Nothing to do here in an aborted transaction.
 	 */
-	if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
+	if (IsAbortedTransactionBlockState())
 		ereport(ERROR,
 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
 				 errmsg("current transaction is aborted, "
@@ -1428,7 +1398,7 @@ exec_replication_command(const char *cmd_string)
 	initStringInfo(&reply_message);
 	initStringInfo(&tmpbuf);
 
-	switch (cmd_node->type)
+	switch (nodeTag(cmd->replicationCmd))
 	{
 		case T_IdentifySystemCmd:
 			IdentifySystem();
@@ -1436,64 +1406,46 @@ exec_replication_command(const char *cmd_string)
 
 		case T_BaseBackupCmd:
 			PreventTransactionChain(true, "BASE_BACKUP");
-			SendBaseBackup((BaseBackupCmd *) cmd_node);
+			CommitTransactionCommand();
+			SendBaseBackup((BaseBackupCmd *) cmd->replicationCmd);
+			StartTransactionCommand();
 			break;
 
 		case T_CreateReplicationSlotCmd:
-			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd->replicationCmd);
 			break;
 
 		case T_DropReplicationSlotCmd:
-			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+			DropReplicationSlot((DropReplicationSlotCmd *) cmd->replicationCmd);
 			break;
 
 		case T_StartReplicationCmd:
 			{
-				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
+				StartReplicationCmd *scmd = (StartReplicationCmd *) cmd->replicationCmd;
 
 				PreventTransactionChain(true, "START_REPLICATION");
 
-				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
-					StartReplication(cmd);
+				CommitTransactionCommand();
+
+				if (scmd->kind == REPLICATION_KIND_PHYSICAL)
+					StartReplication(scmd);
 				else
-					StartLogicalReplication(cmd);
+					StartLogicalReplication(scmd);
+
+				StartTransactionCommand();
 				break;
 			}
 
 		case T_TimeLineHistoryCmd:
 			PreventTransactionChain(true, "TIMELINE_HISTORY");
-			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+			SendTimeLineHistory((TimeLineHistoryCmd *) cmd->replicationCmd);
 			break;
 
-		case T_VariableShowStmt:
-			{
-				DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
-				VariableShowStmt *n = (VariableShowStmt *) cmd_node;
-
-				GetPGVariable(n->name, dest);
-			}
-			break;
-
-		case T_SQLCmd:
-			if (MyDatabaseId == InvalidOid)
-				ereport(ERROR,
-						(errmsg("not connected to database")));
-
-			/* Tell the caller that this wasn't a WalSender command. */
-			return false;
-
 		default:
 			elog(ERROR, "unrecognized replication command node tag: %u",
-				 cmd_node->type);
+				 cmd->replicationCmd->type);
 	}
 
-	/* done */
-	MemoryContextSwitchTo(old_context);
-	MemoryContextDelete(cmd_context);
-
-	/* Send CommandComplete message */
-	EndCommand("SELECT", DestRemote);
-
 	return true;
 }
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 75c2d9a61d..485e61f709 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -173,7 +173,6 @@ static int	InteractiveBackend(StringInfo inBuf);
 static int	interactive_getc(void);
 static int	SocketBackend(StringInfo inBuf);
 static int	ReadCommand(StringInfo inBuf);
-static void forbidden_in_wal_sender(char firstchar);
 static List *pg_rewrite_query(Query *query);
 static bool check_log_statement(List *stmt_list);
 static int	errdetail_execute(List *raw_parsetree_list);
@@ -1015,6 +1014,14 @@ exec_simple_query(const char *query_string)
 		 */
 		if (analyze_requires_snapshot(parsetree))
 		{
+			if (am_walsender && !am_db_walsender)
+			{
+				/* FIXME: message */
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("non-replication statement not supported in pure replication connection")));
+			}
+
 			PushActiveSnapshot(GetTransactionSnapshot());
 			snapshot_set = true;
 		}
@@ -1325,6 +1332,14 @@ exec_parse_message(const char *query_string,	/* string to execute */
 		 */
 		if (analyze_requires_snapshot(raw_parse_tree))
 		{
+			if (am_walsender && !am_db_walsender)
+			{
+				/* FIXME: message */
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("non-replication statement not supported in pure replication connection")));
+			}
+
 			PushActiveSnapshot(GetTransactionSnapshot());
 			snapshot_set = true;
 		}
@@ -1610,6 +1625,14 @@ exec_bind_message(StringInfo input_message)
 		(psrc->raw_parse_tree &&
 		 analyze_requires_snapshot(psrc->raw_parse_tree)))
 	{
+		if (am_walsender && !am_db_walsender)
+		{
+			/* FIXME: message */
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("non-replication statement not supported in pure replication connection")));
+		}
+
 		PushActiveSnapshot(GetTransactionSnapshot());
 		snapshot_set = true;
 	}
@@ -4066,13 +4089,7 @@ PostgresMain(int argc, char *argv[],
 					query_string = pq_getmsgstring(&input_message);
 					pq_getmsgend(&input_message);
 
-					if (am_walsender)
-					{
-						if (!exec_replication_command(query_string))
-							exec_simple_query(query_string);
-					}
-					else
-						exec_simple_query(query_string);
+					exec_simple_query(query_string);
 
 					send_ready_for_query = true;
 				}
@@ -4085,8 +4102,6 @@ PostgresMain(int argc, char *argv[],
 					int			numParams;
 					Oid		   *paramTypes = NULL;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
 
@@ -4109,8 +4124,6 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'B':			/* bind */
-				forbidden_in_wal_sender(firstchar);
-
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
 
@@ -4126,8 +4139,6 @@ PostgresMain(int argc, char *argv[],
 					const char *portal_name;
 					int			max_rows;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() */
 					SetCurrentStatementStartTimestamp();
 
@@ -4140,8 +4151,6 @@ PostgresMain(int argc, char *argv[],
 				break;
 
 			case 'F':			/* fastpath function call */
-				forbidden_in_wal_sender(firstchar);
-
 				/* Set statement_timestamp() */
 				SetCurrentStatementStartTimestamp();
 
@@ -4177,8 +4186,6 @@ PostgresMain(int argc, char *argv[],
 					int			close_type;
 					const char *close_target;
 
-					forbidden_in_wal_sender(firstchar);
-
 					close_type = pq_getmsgbyte(&input_message);
 					close_target = pq_getmsgstring(&input_message);
 					pq_getmsgend(&input_message);
@@ -4221,8 +4228,6 @@ PostgresMain(int argc, char *argv[],
 					int			describe_type;
 					const char *describe_target;
 
-					forbidden_in_wal_sender(firstchar);
-
 					/* Set statement_timestamp() (needed for xact) */
 					SetCurrentStatementStartTimestamp();
 
@@ -4305,30 +4310,6 @@ PostgresMain(int argc, char *argv[],
 }
 
 /*
- * Throw an error if we're a WAL sender process.
- *
- * This is used to forbid anything else than simple query protocol messages
- * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
- * message was received, and is used to construct the error message.
- */
-static void
-forbidden_in_wal_sender(char firstchar)
-{
-	if (am_walsender)
-	{
-		if (firstchar == 'F')
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("fastpath function calls not supported in a replication connection")));
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("extended query protocol not supported in a replication connection")));
-	}
-}
-
-
-/*
  * Obtain platform stack depth limit (in bytes)
  *
  * Return -1 if unknown
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index e30aeb1c7f..450ee08461 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1155,6 +1155,8 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
 		  IsA(utilityStmt, VariableSetStmt) ||
 		  IsA(utilityStmt, VariableShowStmt) ||
 		  IsA(utilityStmt, ConstraintsSetStmt) ||
+		  /* replication commands shouldn't run w/ snapshot  */
+		  IsA(utilityStmt, ReplicationCmd) ||
 	/* efficiency hacks from here down */
 		  IsA(utilityStmt, FetchStmt) ||
 		  IsA(utilityStmt, ListenStmt) ||
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 24e5c427c6..a313fde8d9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,10 +57,12 @@
 #include "commands/vacuum.h"
 #include "commands/view.h"
 #include "miscadmin.h"
+#include "nodes/replnodes.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteRemove.h"
+#include "replication/walsender.h"
 #include "storage/fd.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
@@ -923,6 +925,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_ReplicationCmd:
+			exec_replication_command((ReplicationCmd *) parsetree, queryString, isTopLevel);
+			strcpy(completionTag, CreateCommandTag(parsetree));
+			break;
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2843,6 +2850,40 @@ CreateCommandTag(Node *parsetree)
 			}
 			break;
 
+		case T_ReplicationCmd:
+			switch (nodeTag(((ReplicationCmd *) parsetree)->replicationCmd))
+			{
+				case T_IdentifySystemCmd:
+					tag = "IDENTIFY_SYSTEM";
+					break;
+
+				case T_BaseBackupCmd:
+					tag = "BASE_BACKUP";
+					break;
+
+				case T_CreateReplicationSlotCmd:
+					tag = "CREATE_REPLICATION_SLOT";
+					break;
+
+				case T_DropReplicationSlotCmd:
+					tag = "DROP_REPLICATION_SLOT";
+					break;
+
+				case T_StartReplicationCmd:
+					tag = "START_REPLICATION";
+					break;
+
+				case T_TimeLineHistoryCmd:
+					tag = "TIMELINE_HISTORY";
+					break;
+				default:
+					elog(WARNING, "unrecognized replication command: %d",
+						 (int) nodeTag(parsetree));
+					tag = "???";
+					break;
+			}
+			break;
+
 		default:
 			elog(WARNING, "unrecognized node type: %d",
 				 (int) nodeTag(parsetree));
@@ -3349,6 +3390,11 @@ GetCommandLogLevel(Node *parsetree)
 			}
 			break;
 
+		case T_ReplicationCmd:
+			/* FIXME: Invent new category? */
+			lev = LOGSTMT_DDL;
+			break;
+
 		default:
 			elog(WARNING, "unrecognized node type: %d",
 				 (int) nodeTag(parsetree));
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6b081bd737..2950c7b338 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -240,7 +240,7 @@ StreamLogicalLog(void)
 				replication_slot);
 
 	/* Initiate the replication stream at specified location */
-	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
+	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL '%X/%X'",
 			 replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
 
 	/* print options if there are any */
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57cf7..5a11bb8c4f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -583,7 +583,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			return true;
 
 		/* Initiate the replication stream at specified location */
-		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
+		snprintf(query, sizeof(query), "START_REPLICATION %s'%X/%X' TIMELINE %u",
 				 slotcmd,
 				 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
 				 stream->timeline);
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index f59d719923..107b36fd6e 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -472,13 +472,13 @@ typedef enum NodeTag
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
 	 */
+	T_ReplicationCmd, /* container for all other replication commands */
 	T_IdentifySystemCmd,
 	T_BaseBackupCmd,
 	T_CreateReplicationSlotCmd,
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
-	T_SQLCmd,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 92ada41b6d..b4afab85da 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -17,6 +17,14 @@
 #include "access/xlogdefs.h"
 #include "nodes/pg_list.h"
 
+
+typedef struct ReplicationCmd
+{
+	NodeTag		type;
+
+	Node	   *replicationCmd;
+} ReplicationCmd;
+
 typedef enum ReplicationKind
 {
 	REPLICATION_KIND_PHYSICAL,
diff --git a/src/include/parser/gramparse.h b/src/include/parser/gramparse.h
index 2da98f67f3..0a3edc79e2 100644
--- a/src/include/parser/gramparse.h
+++ b/src/include/parser/gramparse.h
@@ -20,6 +20,7 @@
 #define GRAMPARSE_H
 
 #include "nodes/parsenodes.h"
+#include "access/xlogdefs.h"
 #include "parser/scanner.h"
 
 /*
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 37542aaee4..7130803e21 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -53,6 +53,7 @@ PG_KEYWORD("attach", ATTACH, UNRESERVED_KEYWORD)
 PG_KEYWORD("attribute", ATTRIBUTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("authorization", AUTHORIZATION, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("backward", BACKWARD, UNRESERVED_KEYWORD)
+PG_KEYWORD("base_backup", BASE_BACKUP, UNRESERVED_KEYWORD)
 PG_KEYWORD("before", BEFORE, UNRESERVED_KEYWORD)
 PG_KEYWORD("begin", BEGIN_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("between", BETWEEN, COL_NAME_KEYWORD)
@@ -99,6 +100,7 @@ PG_KEYWORD("conversion", CONVERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("copy", COPY, UNRESERVED_KEYWORD)
 PG_KEYWORD("cost", COST, UNRESERVED_KEYWORD)
 PG_KEYWORD("create", CREATE, RESERVED_KEYWORD)
+PG_KEYWORD("create_replication_slot", CREATE_REPLICATION_SLOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("cross", CROSS, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("csv", CSV, UNRESERVED_KEYWORD)
 PG_KEYWORD("cube", CUBE, UNRESERVED_KEYWORD)
@@ -139,6 +141,7 @@ PG_KEYWORD("document", DOCUMENT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("domain", DOMAIN_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("double", DOUBLE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("drop", DROP, UNRESERVED_KEYWORD)
+PG_KEYWORD("drop_replication_slot", DROP_REPLICATION_SLOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("each", EACH, UNRESERVED_KEYWORD)
 PG_KEYWORD("else", ELSE, RESERVED_KEYWORD)
 PG_KEYWORD("enable", ENABLE_P, UNRESERVED_KEYWORD)
@@ -155,11 +158,13 @@ PG_KEYWORD("exclusive", EXCLUSIVE, UNRESERVED_KEYWORD)
 PG_KEYWORD("execute", EXECUTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("exists", EXISTS, COL_NAME_KEYWORD)
 PG_KEYWORD("explain", EXPLAIN, UNRESERVED_KEYWORD)
+PG_KEYWORD("export_snapshot", EXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("extension", EXTENSION, UNRESERVED_KEYWORD)
 PG_KEYWORD("external", EXTERNAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
 PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
+PG_KEYWORD("fast", FAST_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
@@ -186,6 +191,7 @@ PG_KEYWORD("having", HAVING, RESERVED_KEYWORD)
 PG_KEYWORD("header", HEADER_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD)
 PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("identify_system", IDENTIFY_SYSTEM, UNRESERVED_KEYWORD)
 PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD)
@@ -240,9 +246,11 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("logical", LOGICAL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
+PG_KEYWORD("max_rate", MAX_RATE, UNRESERVED_KEYWORD)
 PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD)
 PG_KEYWORD("method", METHOD, UNRESERVED_KEYWORD)
 PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD)
@@ -258,6 +266,7 @@ PG_KEYWORD("nchar", NCHAR, COL_NAME_KEYWORD)
 PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
 PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
+PG_KEYWORD("noexport_snapshot", NOEXPORT_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
 PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD)
 PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
@@ -297,6 +306,7 @@ PG_KEYWORD("partial", PARTIAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("partition", PARTITION, UNRESERVED_KEYWORD)
 PG_KEYWORD("passing", PASSING, UNRESERVED_KEYWORD)
 PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD)
+PG_KEYWORD("physical", PHYSICAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD)
 PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD)
 PG_KEYWORD("policy", POLICY, UNRESERVED_KEYWORD)
@@ -312,6 +322,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD)
 PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD)
+PG_KEYWORD("progress", PROGRESS, UNRESERVED_KEYWORD)
 PG_KEYWORD("publication", PUBLICATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD)
 PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD)
@@ -331,6 +342,7 @@ PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD)
 PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD)
 PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD)
+PG_KEYWORD("reserve_wal", RESERVE_WAL, UNRESERVED_KEYWORD)
 PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD)
 PG_KEYWORD("restart", RESTART, UNRESERVED_KEYWORD)
 PG_KEYWORD("restrict", RESTRICT, UNRESERVED_KEYWORD)
@@ -374,6 +386,7 @@ PG_KEYWORD("sql", SQL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD)
 PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("start", START, UNRESERVED_KEYWORD)
+PG_KEYWORD("start_replication", START_REPLICATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("statement", STATEMENT, UNRESERVED_KEYWORD)
 PG_KEYWORD("statistics", STATISTICS, UNRESERVED_KEYWORD)
 PG_KEYWORD("stdin", STDIN, UNRESERVED_KEYWORD)
@@ -390,12 +403,15 @@ PG_KEYWORD("table", TABLE, RESERVED_KEYWORD)
 PG_KEYWORD("tables", TABLES, UNRESERVED_KEYWORD)
 PG_KEYWORD("tablesample", TABLESAMPLE, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("tablespace", TABLESPACE, UNRESERVED_KEYWORD)
+PG_KEYWORD("tablespace_map", TABLESPACE_MAP_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("temp", TEMP, UNRESERVED_KEYWORD)
 PG_KEYWORD("template", TEMPLATE, UNRESERVED_KEYWORD)
 PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD)
 PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeline", TIMELINE_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("timeline_history", TIMELINE_HISTORY, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -419,6 +435,7 @@ PG_KEYWORD("unlisten", UNLISTEN, UNRESERVED_KEYWORD)
 PG_KEYWORD("unlogged", UNLOGGED, UNRESERVED_KEYWORD)
 PG_KEYWORD("until", UNTIL, UNRESERVED_KEYWORD)
 PG_KEYWORD("update", UPDATE, UNRESERVED_KEYWORD)
+PG_KEYWORD("use_snapshot", USE_SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("user", USER, RESERVED_KEYWORD)
 PG_KEYWORD("using", USING, RESERVED_KEYWORD)
 PG_KEYWORD("vacuum", VACUUM, UNRESERVED_KEYWORD)
@@ -435,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wal", WAL_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 2ca903872e..65026eb83b 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,6 +16,9 @@
 
 #include "fmgr.h"
 
+#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
+
 /*
  * What to do with a snapshot in create replication slot command.
  */
@@ -38,7 +41,7 @@ extern int	wal_sender_timeout;
 extern bool log_replication_commands;
 
 extern void InitWalSender(void);
-extern bool exec_replication_command(const char *query_string);
+extern bool exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool isToplevel);
 extern void WalSndErrorCleanup(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
-- 
2.12.0.264.gd6db3f2165.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to