From 994ff0d2e0dff20aef87e5b423ae92d4bab75e5e Mon Sep 17 00:00:00 2001
From: Nikita Malakhov <n.malakhov@postgrespro.ru>
Date: Wed, 13 Aug 2025 00:34:57 +0300
Subject: [PATCH] Introducing detoast iterators POC patch. The main purpose of
 the detoast iterator is partial detoasting when we do not need the full
 value, i.e. searching for substring and JSON storage optimization, meaning
 partial detoasting for large JSON values.

This patch is based on an old one:
https://www.postgresql.org/message-id/flat/CAL-OGks_onzpc9M9bXPCztMofWULcFkyeCeKiAgXzwRL8kXiag%40mail.gmail.com
with some improvements.

Author: Nikita Malakhov
---
 src/backend/access/common/detoast.c           | 246 ++++++++++++++++
 src/backend/access/common/toast_compression.c |  16 +-
 src/backend/access/common/toast_internals.c   | 274 ++++++++++++++++++
 src/backend/access/transam/xlogreader.c       |   5 +-
 src/backend/utils/adt/varlena.c               |  30 +-
 src/common/pg_lzcompress.c                    | 134 ++++++---
 src/include/access/detoast.h                  |  99 +++++++
 src/include/access/toast_compression.h        |   5 +
 src/include/common/pg_lzcompress.h            |   8 +
 src/include/fmgr.h                            |  13 +
 10 files changed, 782 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
index 62651787742..aa9fc494430 100644
--- a/src/backend/access/common/detoast.c
+++ b/src/backend/access/common/detoast.c
@@ -17,6 +17,7 @@
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/toast_internals.h"
+#include "access/toast_compression.h"
 #include "common/int.h"
 #include "common/pg_lzcompress.h"
 #include "utils/expandeddatum.h"
@@ -28,6 +29,52 @@ static struct varlena *toast_fetch_datum_slice(struct varlena *attr,
 											   int32 slicelength);
 static struct varlena *toast_decompress_datum(struct varlena *attr);
 static struct varlena *toast_decompress_datum_slice(struct varlena *attr, int32 slicelength);
+static void toast_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
+						 ToastCompressionId compression_method,
+						 void **decompression_state,
+						 const char *destend);
+static void free_detoast_iterator_resources(DetoastIterator iter);
+
+
+void detoast_iterate(DetoastIterator detoast_iter, const char *destend)
+{
+	FetchDatumIterator fetch_iter = detoast_iter->fetch_datum_iterator;
+
+	Assert(detoast_iter != NULL && !detoast_iter->done);
+
+	if (!detoast_iter->compressed)
+		destend = NULL;
+
+	if (1 && destend)
+	{
+		const char *srcend = (const char *)
+			(fetch_iter->buf->limit == fetch_iter->buf->capacity ?
+			fetch_iter->buf->limit : fetch_iter->buf->limit - 4);
+
+		if (fetch_iter->buf->position >= srcend && !fetch_iter->done)
+			fetch_datum_iterate(fetch_iter);
+	}
+	else if (!fetch_iter->done)
+		fetch_datum_iterate(fetch_iter);
+
+	if (detoast_iter->compressed)
+		toast_decompress_iterate(fetch_iter->buf, detoast_iter->buf,
+								 detoast_iter->compression_method,
+								 &detoast_iter->decompression_state,
+								 destend);
+
+	if (detoast_iter->buf->limit == detoast_iter->buf->capacity)
+	{
+		detoast_iter->done = true;
+#if 0
+		if (detoast_iter->buf == fetch_iter->buf)
+			fetch_iter->buf = NULL;
+		free_fetch_datum_iterator(fetch_iter);
+		detoast_iter->fetch_datum_iterator = NULL;
+#endif
+	}
+}
+
 
 /* ----------
  * detoast_external_attr -
@@ -644,3 +691,202 @@ toast_datum_size(Datum value)
 	}
 	return result;
 }
+
+void
+free_detoast_iterator_resources(DetoastIterator iter)
+{
+	if (iter->compressed && iter->buf)
+	{
+		free_toast_buffer(iter->buf);
+		iter->buf = NULL;
+	}
+
+	if (iter->fetch_datum_iterator)
+		free_fetch_datum_iterator(iter->fetch_datum_iterator);
+	iter->fetch_datum_iterator = NULL;
+
+	if (iter->self_ptr)
+		*iter->self_ptr = NULL;
+}
+
+/* ----------
+ * create_detoast_iterator -
+ *
+ * It only makes sense to initialize a de-TOAST iterator for external on-disk values.
+ *
+ * ----------
+ */
+DetoastIterator
+create_detoast_iterator(struct varlena *attr)
+{
+	struct varatt_external toast_pointer;
+	DetoastIterator iter;
+	if (VARATT_IS_EXTERNAL_ONDISK(attr))
+	{
+		FetchDatumIterator fetch_iter;
+
+		iter = (DetoastIterator) palloc0(sizeof(DetoastIteratorData));
+		iter->done = false;
+		iter->nrefs = 1;
+
+		/* This is an externally stored datum --- initialize fetch datum iterator */
+		iter->fetch_datum_iterator = fetch_iter = create_fetch_datum_iterator(attr);
+		VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
+		if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
+		{
+			iter->compressed = true;
+			iter->compression_method = VARATT_EXTERNAL_GET_COMPRESS_METHOD(toast_pointer);
+
+			/* prepare buffer to received decompressed data */
+			iter->buf = create_toast_buffer(toast_pointer.va_rawsize, false);
+		}
+		else
+		{
+			iter->compressed = false;
+			iter->compression_method = TOAST_INVALID_COMPRESSION_ID;
+
+			/* point the buffer directly at the raw data */
+			iter->buf = fetch_iter->buf;
+		}
+		return iter;
+	}
+	else if (VARATT_IS_EXTERNAL_INDIRECT(attr))
+	{
+		/* indirect pointer --- dereference it */
+		struct varatt_indirect redirect;
+
+		VARATT_EXTERNAL_GET_POINTER(redirect, attr);
+		attr = (struct varlena *) redirect.pointer;
+
+		/* nested indirect Datums aren't allowed */
+		Assert(!VARATT_IS_EXTERNAL_INDIRECT(attr));
+
+		/* recurse in case value is still extended in some other way */
+		return create_detoast_iterator(attr);
+
+	}
+	else if (1 && VARATT_IS_COMPRESSED(attr))
+	{
+		ToastBuffer *buf;
+
+		iter = (DetoastIterator) palloc0(sizeof(DetoastIteratorData));
+		iter->done = false;
+		iter->nrefs = 1;
+
+		iter->fetch_datum_iterator = palloc0(sizeof(*iter->fetch_datum_iterator));
+		iter->fetch_datum_iterator->buf = buf = create_toast_buffer(VARSIZE_ANY(attr), true);
+		iter->fetch_datum_iterator->done = true;
+		iter->compressed = true;
+		iter->compression_method = VARDATA_COMPRESSED_GET_COMPRESS_METHOD(attr);
+
+		memcpy((void *) buf->buf, attr, VARSIZE_ANY(attr));
+		buf->limit = (char *) buf->capacity;
+
+		/* prepare buffer to received decompressed data */
+		iter->buf = create_toast_buffer(TOAST_COMPRESS_EXTSIZE(attr) + VARHDRSZ, false);
+
+		return iter;
+	}
+	else
+		/* in-line value -- no iteration used, even if it's compressed */
+		return NULL;
+}
+
+/* ----------
+ * free_detoast_iterator -
+ *
+ * Free memory used by the de-TOAST iterator, including buffers and
+ * fetch datum iterator.
+ * ----------
+ */
+void
+free_detoast_iterator(DetoastIterator iter)
+{
+	if (iter == NULL)
+		return;
+
+	if (--iter->nrefs > 0)
+		return;
+
+	free_detoast_iterator_resources(iter);
+
+	pfree(iter);
+}
+
+static void
+toast_decompress_iterate(ToastBuffer *source, ToastBuffer *dest,
+						 ToastCompressionId compression_method,
+						 void **decompression_state,
+						 const char *destend)
+{
+	const char *sp;
+	const char *srcend;
+	char	   *dp;
+	int32		dlen;
+	int32		slen;
+	bool		last_source_chunk;
+
+	/*
+	 * In the while loop, sp may be incremented such that it points beyond
+	 * srcend. To guard against reading beyond the end of the current chunk,
+	 * we set srcend such that we exit the loop when we are within four bytes
+	 * of the end of the current chunk. When source->limit reaches
+	 * source->capacity, we are decompressing the last chunk, so we can (and
+	 * need to) read every byte.
+	 */
+	last_source_chunk = source->limit == source->capacity;
+	srcend = last_source_chunk ? source->limit : source->limit - 4;
+	sp = source->position;
+	dp = dest->limit;
+	if (destend > dest->capacity)
+		destend = dest->capacity;
+
+	slen = srcend - source->position;
+
+	/*
+	 * Decompress the data using the appropriate decompression routine.
+	 */
+	switch (compression_method)
+	{
+		case TOAST_PGLZ_COMPRESSION_ID:
+			dlen = pglz_decompress_ext(sp, &slen, dp, destend - dp,
+										 last_source_chunk && destend == dest->capacity,
+										 last_source_chunk,
+										 true, decompression_state);
+			break;
+		case TOAST_LZ4_COMPRESSION_ID:
+			if (source->limit < source->capacity)
+				dlen = 0;	/* LZ4 needs need full data to decompress */
+			else
+			{
+				/* decompress the data */
+#ifndef USE_LZ4
+				NO_LZ4_SUPPORT();
+				dlen = 0;
+#else
+				dlen = LZ4_decompress_safe(source->buf + VARHDRSZ_COMPRESSED,
+										   VARDATA(dest->buf),
+										   VARSIZE(source->buf) - VARHDRSZ_COMPRESSED,
+										   VARDATA_COMPRESSED_GET_EXTSIZE(source->buf));
+
+				if (dlen < 0)
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg_internal("compressed lz4 data is corrupt")));
+#endif
+				slen = 0;
+			}
+			break;
+		default:
+			elog(ERROR, "invalid compression method id %d", compression_method);
+			return;		/* keep compiler quiet */
+	}
+
+	if (dlen < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg_internal("compressed data is corrupt")));
+
+	source->position += slen;
+	dest->limit += dlen;
+}
diff --git a/src/backend/access/common/toast_compression.c b/src/backend/access/common/toast_compression.c
index 926f1e4008a..21328975f72 100644
--- a/src/backend/access/common/toast_compression.c
+++ b/src/backend/access/common/toast_compression.c
@@ -83,15 +83,17 @@ pglz_decompress_datum(const struct varlena *value)
 {
 	struct varlena *result;
 	int32		rawsize;
+	int32		slen;
 
 	/* allocate memory for the uncompressed data */
 	result = (struct varlena *) palloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);
 
 	/* decompress the data */
-	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED,
-							  VARSIZE(value) - VARHDRSZ_COMPRESSED,
+	slen = VARSIZE(value) - VARHDRSZ_COMPRESSED;
+	rawsize = pglz_decompress_ext((char *) value + VARHDRSZ_COMPRESSED,
+							  &slen,
 							  VARDATA(result),
-							  VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
+							  VARDATA_COMPRESSED_GET_EXTSIZE(value), true, true, false, NULL);
 	if (rawsize < 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_DATA_CORRUPTED),
@@ -111,15 +113,17 @@ pglz_decompress_datum_slice(const struct varlena *value,
 {
 	struct varlena *result;
 	int32		rawsize;
+	int32		slen;
 
 	/* allocate memory for the uncompressed data */
 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
 
 	/* decompress the data */
-	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESSED,
-							  VARSIZE(value) - VARHDRSZ_COMPRESSED,
+	slen = VARSIZE(value) - VARHDRSZ_COMPRESSED;
+	rawsize = pglz_decompress_ext((char *) value + VARHDRSZ_COMPRESSED,
+							  &slen,
 							  VARDATA(result),
-							  slicelength, false);
+							  slicelength, false, true, false, NULL);
 	if (rawsize < 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_DATA_CORRUPTED),
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index a1d0eed8953..82be8c26399 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -28,6 +28,7 @@
 
 static bool toastrel_valueid_exists(Relation toastrel, Oid valueid);
 static bool toastid_valueid_exists(Oid toastrelid, Oid valueid);
+void init_toast_snapshot(Snapshot toast_snapshot);
 
 /* ----------
  * toast_compress_datum -
@@ -654,3 +655,276 @@ get_toast_snapshot(void)
 
 	return &SnapshotToastData;
 }
+
+/* ----------
+ * init_toast_snapshot
+ *
+ *	Initialize an appropriate TOAST snapshot.  We must use an MVCC snapshot
+ *	to initialize the TOAST snapshot; since we don't know which one to use,
+ *	just use the oldest one.  This is safe: at worst, we will get a "snapshot
+ *	too old" error that might have been avoided otherwise.
+ */
+void
+init_toast_snapshot(Snapshot toast_snapshot)
+{
+	Snapshot	snapshot = GetTransactionSnapshot();
+
+	if (snapshot == NULL)
+		elog(ERROR, "cannot fetch toast data without an active snapshot");
+	toast_snapshot->snapshot_type = SNAPSHOT_TOAST;
+}
+
+static void
+create_fetch_datum_iterator_scan(FetchDatumIterator iter)
+{
+	int			validIndex;
+
+	MemoryContext oldcxt = MemoryContextSwitchTo(iter->mcxt);
+
+	/*
+	 * Open the toast relation and its indexes
+	 */
+	iter->toastrel = table_open(iter->toast_pointer.va_toastrelid, AccessShareLock);
+
+	/* Look for the valid index of the toast relation */
+	validIndex = toast_open_indexes(iter->toastrel,
+									AccessShareLock,
+									&iter->toastidxs,
+									&iter->num_indexes);
+
+	/*
+	 * Setup a scan key to fetch from the index by va_valueid
+	 */
+	ScanKeyInit(&iter->toastkey,
+				(AttrNumber) 1,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(iter->toast_pointer.va_valueid));
+
+	/*
+	 * Read the chunks by index
+	 *
+	 * Note that because the index is actually on (valueid, chunkidx) we will
+	 * see the chunks in chunkidx order, even though we didn't explicitly ask
+	 * for it.
+	 */
+
+	init_toast_snapshot(&iter->snapshot);
+	iter->toastscan = systable_beginscan_ordered(iter->toastrel, iter->toastidxs[validIndex],
+												 &iter->snapshot, 1, &iter->toastkey);
+
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/* ----------
+ * create_fetch_datum_iterator -
+ *
+ * Initialize fetch datum iterator.
+ * ----------
+ */
+FetchDatumIterator
+create_fetch_datum_iterator(struct varlena *attr)
+{
+	FetchDatumIterator iter;
+
+	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
+		elog(ERROR, "create_fetch_datum_iterator shouldn't be called for non-ondisk datums");
+
+	iter = (FetchDatumIterator) palloc0(sizeof(FetchDatumIteratorData));
+
+	iter->mcxt = CurrentMemoryContext;
+
+	/* Must copy to access aligned fields */
+	VARATT_EXTERNAL_GET_POINTER(iter->toast_pointer, attr);
+
+	iter->ressize = VARATT_EXTERNAL_GET_EXTSIZE(iter->toast_pointer);
+	iter->numchunks = ((iter->ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
+
+	iter->buf = create_toast_buffer(iter->ressize + VARHDRSZ,
+									VARATT_EXTERNAL_IS_COMPRESSED(iter->toast_pointer));
+
+	iter->nextidx = 0;
+	iter->done = false;
+
+	return iter;
+}
+
+void
+free_fetch_datum_iterator(FetchDatumIterator iter)
+{
+	if (iter == NULL)
+		return;
+
+	if (!iter->done && iter->toastscan)
+	{
+		systable_endscan_ordered(iter->toastscan);
+		toast_close_indexes(iter->toastidxs, iter->num_indexes, AccessShareLock);
+		table_close(iter->toastrel, AccessShareLock);
+	}
+	free_toast_buffer(iter->buf);
+	pfree(iter);
+}
+
+/* ----------
+ * fetch_datum_iterate -
+ *
+ * Iterate through the toasted value referenced by iterator.
+ *
+ * As long as there is another chunk data in external storage,
+ * fetch it into iterator's toast buffer.
+ * ----------
+ */
+void
+fetch_datum_iterate(FetchDatumIterator iter)
+{
+	HeapTuple	ttup;
+	TupleDesc	toasttupDesc;
+	int32		residx;
+	Pointer		chunk;
+	bool		isnull;
+	char		*chunkdata;
+	int32		chunksize;
+
+	Assert(iter != NULL && !iter->done);
+
+	if (!iter->toastscan)
+		create_fetch_datum_iterator_scan(iter);
+
+	ttup = systable_getnext_ordered(iter->toastscan, ForwardScanDirection);
+	if (ttup == NULL)
+	{
+		/*
+		 * Final checks that we successfully fetched the datum
+		 */
+		if (iter->nextidx != iter->numchunks)
+			elog(ERROR, "missing chunk number %d for toast value %u in %s",
+				 iter->nextidx,
+				 iter->toast_pointer.va_valueid,
+				 RelationGetRelationName(iter->toastrel));
+
+		/*
+		 * End scan and close relations
+		 */
+		systable_endscan_ordered(iter->toastscan);
+		toast_close_indexes(iter->toastidxs, iter->num_indexes, AccessShareLock);
+		table_close(iter->toastrel, AccessShareLock);
+
+		iter->done = true;
+		return;
+	}
+
+	/*
+	 * Have a chunk, extract the sequence number and the data
+	 */
+	toasttupDesc = iter->toastrel->rd_att;
+	residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
+	Assert(!isnull);
+	chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
+	Assert(!isnull);
+	if (!VARATT_IS_EXTENDED(chunk))
+	{
+		chunksize = VARSIZE(chunk) - VARHDRSZ;
+		chunkdata = VARDATA(chunk);
+	}
+	else if (VARATT_IS_SHORT(chunk))
+	{
+		/* could happen due to heap_form_tuple doing its thing */
+		chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
+		chunkdata = VARDATA_SHORT(chunk);
+	}
+	else
+	{
+		/* should never happen */
+		elog(ERROR, "found toasted toast chunk for toast value %u in %s",
+			 iter->toast_pointer.va_valueid,
+			 RelationGetRelationName(iter->toastrel));
+		chunksize = 0;		/* keep compiler quiet */
+		chunkdata = NULL;
+	}
+
+	/*
+	 * Some checks on the data we've found
+	 */
+	if (residx != iter->nextidx)
+		elog(ERROR, "unexpected chunk number %d (expected %d) for toast value %u in %s",
+			 residx, iter->nextidx,
+			 iter->toast_pointer.va_valueid,
+			 RelationGetRelationName(iter->toastrel));
+	if (residx < iter->numchunks - 1)
+	{
+		if (chunksize != TOAST_MAX_CHUNK_SIZE)
+			elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
+				 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
+				 residx, iter->numchunks,
+				 iter->toast_pointer.va_valueid,
+				 RelationGetRelationName(iter->toastrel));
+	}
+	else if (residx == iter->numchunks - 1)
+	{
+		if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != iter->ressize)
+			elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
+				 chunksize,
+				 (int) (iter->ressize - residx * TOAST_MAX_CHUNK_SIZE),
+				 residx,
+				 iter->toast_pointer.va_valueid,
+				 RelationGetRelationName(iter->toastrel));
+	}
+	else
+		elog(ERROR, "unexpected chunk number %d (out of range %d..%d) for toast value %u in %s",
+			 residx,
+			 0, iter->numchunks - 1,
+			 iter->toast_pointer.va_valueid,
+			 RelationGetRelationName(iter->toastrel));
+
+	/*
+	 * Copy the data into proper place in our iterator buffer
+	 */
+	memcpy(iter->buf->limit, chunkdata, chunksize);
+	iter->buf->limit += chunksize;
+
+	iter->nextidx++;
+}
+
+/* ----------
+ * create_toast_buffer -
+ *
+ * Create and initialize a TOAST buffer.
+ *
+ * size: buffer size include header
+ * compressed: whether TOAST value is compressed
+ * ----------
+ */
+ToastBuffer *
+create_toast_buffer(int32 size, bool compressed)
+{
+	ToastBuffer *buf = (ToastBuffer *) palloc0(sizeof(ToastBuffer));
+	buf->buf = (const char *) palloc(size);
+	if (compressed)
+	{
+		SET_VARSIZE_COMPRESSED(buf->buf, size);
+		/*
+		 * Note the constraint buf->position <= buf->limit may be broken
+		 * at initialization. Make sure that the constraint is satisfied
+		 * when consuming chars.
+		 */
+		buf->position = VARDATA_4B_C(buf->buf);
+	}
+	else
+	{
+		SET_VARSIZE(buf->buf, size);
+		buf->position = VARDATA_4B(buf->buf);
+	}
+	buf->limit = VARDATA(buf->buf);
+	buf->capacity = buf->buf + size;
+
+	return buf;
+}
+
+void
+free_toast_buffer(ToastBuffer *buf)
+{
+	if (buf == NULL)
+		return;
+
+	pfree((void *)buf->buf);
+	pfree(buf);
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index dcc8d4f9c1b..4480eb88641 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -2106,8 +2106,9 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
 		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
+			int32 slen = bkpb->bimg_len;
+			if (pglz_decompress_ext(ptr, &slen, tmp.data,
+								BLCKSZ - bkpb->hole_length, true, true, false, NULL) < 0)
 				decomp_success = false;
 		}
 		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 11b442a5941..4486c004b73 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -78,6 +78,8 @@ typedef struct
 	 */
 	char	   *refpoint;		/* pointer within original haystack string */
 	int			refpos;			/* 0-based character offset of the same point */
+
+	DetoastIterator iter;
 } TextPositionState;
 
 typedef struct
@@ -134,7 +136,7 @@ static text *text_substring(Datum str,
 							int32 length,
 							bool length_not_specified);
 static text *text_overlay(text *t1, text *t2, int sp, int sl);
-static int	text_position(text *t1, text *t2, Oid collid);
+static int	text_position(text *t1, text *t2, Oid collid, DetoastIterator iter);
 static void text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state);
 static bool text_position_next(TextPositionState *state);
 static char *text_position_next_internal(char *start_ptr, TextPositionState *state);
@@ -847,10 +849,18 @@ text_overlay(text *t1, text *t2, int sp, int sl)
 Datum
 textpos(PG_FUNCTION_ARGS)
 {
-	text	   *str = PG_GETARG_TEXT_PP(0);
+	text	   *str; // = PG_GETARG_TEXT_PP(0);
 	text	   *search_str = PG_GETARG_TEXT_PP(1);
+	struct varlena *attr = (struct varlena *)
+								DatumGetPointer(PG_GETARG_DATUM(0));
+	DetoastIterator iter = create_detoast_iterator(attr);
+
+	if (iter != NULL)
+		str = (text *) iter->buf->buf;
+	else
+		str = PG_GETARG_TEXT_PP(0);
 
-	PG_RETURN_INT32((int32) text_position(str, search_str, PG_GET_COLLATION()));
+	PG_RETURN_INT32((int32) text_position(str, search_str, PG_GET_COLLATION(), iter));
 }
 
 /*
@@ -868,7 +878,7 @@ textpos(PG_FUNCTION_ARGS)
  *	functions.
  */
 static int
-text_position(text *t1, text *t2, Oid collid)
+text_position(text *t1, text *t2, Oid collid, DetoastIterator iter)
 {
 	TextPositionState state;
 	int			result;
@@ -885,6 +895,7 @@ text_position(text *t1, text *t2, Oid collid)
 		return 0;
 
 	text_position_setup(t1, t2, collid, &state);
+	state.iter = iter;
 	/* don't need greedy mode here */
 	state.greedy = false;
 
@@ -954,6 +965,7 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state)
 	state->last_match = NULL;
 	state->refpoint = state->str1;
 	state->refpos = 0;
+	state->iter = NULL;
 
 	/*
 	 * Prepare the skip table for Boyer-Moore-Horspool searching.  In these
@@ -1136,6 +1148,9 @@ text_position_next_internal(char *start_ptr, TextPositionState *state)
 		hptr = start_ptr;
 		while (hptr < haystack_end)
 		{
+			if (state->iter != NULL)
+				PG_DETOAST_ITERATE(state->iter, hptr);
+
 			/*
 			 * First check the common case that there is a match in the
 			 * haystack of exactly the length of the needle.
@@ -1175,6 +1190,9 @@ text_position_next_internal(char *start_ptr, TextPositionState *state)
 		hptr = start_ptr;
 		while (hptr < haystack_end)
 		{
+			if (state->iter != NULL)
+				PG_DETOAST_ITERATE(state->iter, hptr);
+
 			if (*hptr == nchar)
 				return (char *) hptr;
 			hptr++;
@@ -1192,6 +1210,9 @@ text_position_next_internal(char *start_ptr, TextPositionState *state)
 			const char *nptr;
 			const char *p;
 
+			if (state->iter != NULL)
+				PG_DETOAST_ITERATE(state->iter, hptr);
+
 			nptr = needle_last;
 			p = hptr;
 			while (*nptr == *p)
@@ -1260,6 +1281,7 @@ text_position_reset(TextPositionState *state)
 static void
 text_position_cleanup(TextPositionState *state)
 {
+	free_detoast_iterator(state->iter);
 	/* no cleanup needed */
 }
 
diff --git a/src/common/pg_lzcompress.c b/src/common/pg_lzcompress.c
index 86354d660fd..d5c690da7b4 100644
--- a/src/common/pg_lzcompress.c
+++ b/src/common/pg_lzcompress.c
@@ -675,9 +675,17 @@ pglz_compress(const char *source, int32 slen, char *dest,
 	return result_size;
 }
 
+/* Opaque pglz decompression state */
+typedef struct pglz_state
+{
+	int32		len;
+	int32		off;
+	int			ctrlc;
+	unsigned char ctrl;
+} pglz_state;
 
 /* ----------
- * pglz_decompress -
+ * pglz_decompress_ext -
  *
  *		Decompresses source into dest. Returns the number of bytes
  *		decompressed into the destination buffer, or -1 if the
@@ -686,21 +694,69 @@ pglz_compress(const char *source, int32 slen, char *dest,
  *		If check_complete is true, the data is considered corrupted
  *		if we don't exactly fill the destination buffer.  Callers that
  *		are extracting a slice typically can't apply this check.
+ *
+ *		pglz_decompress modified for usage by detoast iterator
  * ----------
  */
 int32
-pglz_decompress(const char *source, int32 slen, char *dest,
-				int32 rawsize, bool check_complete)
+pglz_decompress_ext(const char *source, int32 *slen, char *dest,
+					  int32 rawsize, bool check_complete, bool last_source_chunk,
+					  bool is_iter, void **pstate)
 {
-	const unsigned char *sp;
-	const unsigned char *srcend;
-	unsigned char *dp;
-	unsigned char *destend;
+	pglz_state *state = pstate ? *pstate : NULL;
+	const unsigned char *sp = (const unsigned char *) source;
+	const unsigned char *srcend = sp + *slen;
+	unsigned char *dp = (unsigned char *) dest;
+	unsigned char *destend = dp + rawsize;
+	unsigned char ctrl;
+	int			ctrlc;
+	int32		len;
+	int32		remlen;
+	int32		off;
+
+	if (state)
+	{
+		ctrl = state->ctrl;
+		ctrlc = state->ctrlc;
+
+		if (state->len)
+		{
+			int32		copylen;
+
+			len = state->len;
+			off = state->off;
+
+			copylen = Min(len, destend - dp);
+			remlen = len - copylen;
+			while (copylen--)
+			{
+				*dp = dp[-off];
+				dp++;
+			}
+
+			if (dp >= destend)
+			{
+				state->len = remlen;
+				*slen = 0;
+				return (char *) dp - dest;
+			}
+
+			Assert(remlen == 0);
+		}
 
-	sp = (const unsigned char *) source;
-	srcend = ((const unsigned char *) source) + slen;
-	dp = (unsigned char *) dest;
-	destend = dp + rawsize;
+		remlen = 0;
+		off = 0;
+
+		if (ctrlc < 8 && sp < srcend && dp < destend)
+			goto ctrl_loop;
+	}
+	else
+	{
+		ctrl = 0;
+		ctrlc = 8;
+		remlen = 0;
+		off = 0;
+	}
 
 	while (sp < srcend && dp < destend)
 	{
@@ -708,13 +764,15 @@ pglz_decompress(const char *source, int32 slen, char *dest,
 		 * Read one control byte and process the next 8 items (or as many as
 		 * remain in the compressed input).
 		 */
-		unsigned char ctrl = *sp++;
-		int			ctrlc;
+		ctrl = *sp++;
 
 		for (ctrlc = 0; ctrlc < 8 && sp < srcend && dp < destend; ctrlc++)
 		{
+ctrl_loop:
 			if (ctrl & 1)
 			{
+				int32		copylen;
+
 				/*
 				 * Set control bit means we must read a match tag. The match
 				 * is coded with two bytes. First byte uses lower nibble to
@@ -724,9 +782,6 @@ pglz_decompress(const char *source, int32 slen, char *dest,
 				 * extension tag byte tells how much longer the match really
 				 * was (0-255).
 				 */
-				int32		len;
-				int32		off;
-
 				len = (sp[0] & 0x0f) + 3;
 				off = ((sp[0] & 0xf0) << 4) | sp[1];
 				sp += 2;
@@ -735,39 +790,34 @@ pglz_decompress(const char *source, int32 slen, char *dest,
 
 				/*
 				 * Check for corrupt data: if we fell off the end of the
-				 * source, or if we obtained off = 0, or if off is more than
-				 * the distance back to the buffer start, we have problems.
-				 * (We must check for off = 0, else we risk an infinite loop
-				 * below in the face of corrupt data.  Likewise, the upper
-				 * limit on off prevents accessing outside the buffer
-				 * boundaries.)
+				 * source, or if we obtained off = 0, we have problems.  (We
+				 * must check this, else we risk an infinite loop below in the
+				 * face of corrupt data.)
 				 */
-				if (unlikely(sp > srcend || off == 0 ||
-							 off > (dp - (unsigned char *) dest)))
+				if (unlikely((sp > srcend && last_source_chunk) || off == 0
+					|| (!is_iter && off > (dp - (unsigned char *) dest))))
 					return -1;
 
 				/*
 				 * Don't emit more data than requested.
 				 */
-				len = Min(len, destend - dp);
+				copylen = Min(len, destend - dp);
+				remlen = len - copylen;
 
 				/*
 				 * Now we copy the bytes specified by the tag from OUTPUT to
-				 * OUTPUT (copy len bytes from dp - off to dp).  The copied
-				 * areas could overlap, so to avoid undefined behavior in
-				 * memcpy(), be careful to copy only non-overlapping regions.
-				 *
-				 * Note that we cannot use memmove() instead, since while its
-				 * behavior is well-defined, it's also not what we want.
+				 * OUTPUT (copy len bytes from dp - off to dp). The copied
+				 * areas could overlap; to prevent possible uncertainty, we
+				 * copy only non-overlapping regions.
 				 */
-				while (off < len)
+				while (off < copylen)
 				{
 					/*
 					 * We can safely copy "off" bytes since that clearly
 					 * results in non-overlapping source and destination.
 					 */
 					memcpy(dp, dp - off, off);
-					len -= off;
+					copylen -= off;
 					dp += off;
 
 					/*----------
@@ -796,8 +846,8 @@ pglz_decompress(const char *source, int32 slen, char *dest,
 					 */
 					off += off;
 				}
-				memcpy(dp, dp - off, len);
-				dp += len;
+				memcpy(dp, dp - off, copylen);
+				dp += copylen;
 			}
 			else
 			{
@@ -821,13 +871,25 @@ pglz_decompress(const char *source, int32 slen, char *dest,
 	if (check_complete && (dp != destend || sp != srcend))
 		return -1;
 
+	if (pstate)
+	{
+		if (!state)
+			*pstate = state = palloc(sizeof(*state));
+
+		state->ctrl = ctrl;
+		state->ctrlc = ctrlc;
+		state->len = remlen;
+		state->off = off;
+
+		*slen = (const char *) sp - source;
+	}
+
 	/*
 	 * That's it.
 	 */
 	return (char *) dp - dest;
 }
 
-
 /* ----------
  * pglz_maximum_compressed_size -
  *
diff --git a/src/include/access/detoast.h b/src/include/access/detoast.h
index e603a2276c3..36e9373ff21 100644
--- a/src/include/access/detoast.h
+++ b/src/include/access/detoast.h
@@ -12,6 +12,11 @@
 #ifndef DETOAST_H
 #define DETOAST_H
 
+#include "postgres.h"
+#include "access/genam.h"
+#include "access/toast_compression.h"
+#include "utils/relcache.h"
+
 /*
  * Macro to fetch the possibly-unaligned contents of an EXTERNAL datum
  * into a local "struct varatt_external" toast pointer.  This should be
@@ -33,6 +38,70 @@ do { \
 /* Size of an EXTERNAL datum that contains an indirection pointer */
 #define INDIRECT_POINTER_SIZE (VARHDRSZ_EXTERNAL + sizeof(varatt_indirect))
 
+/*
+ * TOAST buffer is a producer consumer buffer.
+ *
+ *    +--+--+--+--+--+--+--+--+--+--+--+--+--+
+ *    |  |  |  |  |  |  |  |  |  |  |  |  |  |
+ *    +--+--+--+--+--+--+--+--+--+--+--+--+--+
+ *    ^           ^           ^              ^
+ *   buf      position      limit         capacity
+ *
+ * buf: point to the start of buffer.
+ * position: point to the next char to be consumed.
+ * limit: point to the next char to be produced.
+ * capacity: point to the end of buffer.
+ *
+ * Constraints that need to be satisfied:
+ * buf <= position <= limit <= capacity
+ */
+typedef struct ToastBuffer
+{
+	char	*buf;
+	char	*position;
+	char		*limit;
+	char	*capacity;
+} ToastBuffer;
+
+typedef struct FetchDatumIteratorData
+{
+	ToastBuffer	*buf;
+	Relation	toastrel;
+	Relation	*toastidxs;
+	MemoryContext mcxt;
+	SysScanDesc	toastscan;
+	ScanKeyData	toastkey;
+	SnapshotData			snapshot;
+	struct varatt_external toast_pointer;
+	int32		ressize;
+	int32		nextidx;
+	int32		numchunks;
+	int			num_indexes;
+	bool		done;
+} FetchDatumIteratorData;
+
+typedef struct FetchDatumIteratorData *FetchDatumIterator;
+
+typedef struct DetoastIteratorData *DetoastIterator;
+
+typedef struct DetoastIteratorData
+{
+	ToastBuffer 		*buf;
+	FetchDatumIterator	fetch_datum_iterator;
+	DetoastIterator	   *self_ptr;
+	int					nrefs;
+	void			   *decompression_state;
+	ToastCompressionId	compression_method;
+	bool				compressed;		/* toast value is compressed? */
+	bool				done;
+} DetoastIteratorData;
+
+extern FetchDatumIterator create_fetch_datum_iterator(struct varlena *attr);
+extern void free_fetch_datum_iterator(FetchDatumIterator iter);
+extern void fetch_datum_iterate(FetchDatumIterator iter);
+extern ToastBuffer *create_toast_buffer(int32 size, bool compressed);
+extern void free_toast_buffer(ToastBuffer *buf);
+
 /* ----------
  * detoast_external_attr() -
  *
@@ -79,4 +148,34 @@ extern Size toast_raw_datum_size(Datum value);
  */
 extern Size toast_datum_size(Datum value);
 
+/* ----------
+ * create_detoast_iterator -
+ *
+ * It only makes sense to initialize a de-TOAST iterator for external on-disk values.
+ *
+ * ----------
+ */
+extern DetoastIterator create_detoast_iterator(struct varlena *attr);
+
+/* ----------
+ * free_detoast_iterator -
+ *
+ * Free memory used by the de-TOAST iterator, including buffers and
+ * fetch datum iterator.
+ * ----------
+ */
+extern void free_detoast_iterator(DetoastIterator iter);
+
+/* ----------
+ * detoast_iterate -
+ *
+ * Iterate through the toasted value referenced by iterator.
+ *
+ * As long as there is another data chunk in external storage,
+ * de-TOAST it into iterator's toast buffer.
+ * ----------
+ */
+extern void
+detoast_iterate(DetoastIterator detoast_iter, const char *destend);
+
 #endif							/* DETOAST_H */
diff --git a/src/include/access/toast_compression.h b/src/include/access/toast_compression.h
index 13c4612ceed..0c64977632d 100644
--- a/src/include/access/toast_compression.h
+++ b/src/include/access/toast_compression.h
@@ -52,6 +52,11 @@ typedef enum ToastCompressionId
 
 #define CompressionMethodIsValid(cm)  ((cm) != InvalidCompressionMethod)
 
+#define NO_LZ4_SUPPORT() \
+	ereport(ERROR, \
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+			 errmsg("compression method lz4 not supported"), \
+			 errdetail("This functionality requires the server to be built with lz4 support.")))
 
 /* pglz compression/decompression routines */
 extern struct varlena *pglz_compress_datum(const struct varlena *value);
diff --git a/src/include/common/pg_lzcompress.h b/src/include/common/pg_lzcompress.h
index 2a12b33a008..bd006c9c704 100644
--- a/src/include/common/pg_lzcompress.h
+++ b/src/include/common/pg_lzcompress.h
@@ -89,5 +89,13 @@ extern int32 pglz_decompress(const char *source, int32 slen, char *dest,
 							 int32 rawsize, bool check_complete);
 extern int32 pglz_maximum_compressed_size(int32 rawsize,
 										  int32 total_compressed_size);
+extern int32
+pglz_decompress_state(const char *source, int32 *slen, char *dest,
+					  int32 dlen, bool check_complete, bool last_source_chunk,
+					  void **pstate);
+int32
+pglz_decompress_ext(const char *source, int32 *slen, char *dest,
+					  int32 rawsize, bool check_complete, bool last_source_chunk,
+					  bool is_iter, void **pstate);
 
 #endif							/* _PG_LZCOMPRESS_H_ */
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index 0fe7b4ebc77..25830797f04 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -237,6 +237,19 @@ extern struct varlena *pg_detoast_datum_slice(struct varlena *datum,
 											  int32 first, int32 count);
 extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 
+/*
+ * Support for de-TOASTing toasted value iteratively. "need" is a pointer
+ * between the beginning and end of iterator's ToastBuffer. The marco
+ * de-TOAST all bytes before "need" into iterator's ToastBuffer.
+ */
+#define PG_DETOAST_ITERATE(iter, need)											\
+	do {																		\
+		Assert((need) >= (iter)->buf->buf && (need) <= (iter)->buf->capacity);	\
+		while (!(iter)->done && (need) > (iter)->buf->limit) { 					\
+			detoast_iterate(iter, need);										\
+		}																		\
+	} while (0)
+
 #define PG_DETOAST_DATUM(datum) \
 	pg_detoast_datum((struct varlena *) DatumGetPointer(datum))
 #define PG_DETOAST_DATUM_COPY(datum) \
-- 
2.34.1

