On 3/19/19 4:44 PM, Chris Travers wrote:
>
>
> On Tue, Mar 19, 2019 at 12:19 PM Tomas Vondra
> <[email protected] <mailto:[email protected]>> wrote:
>
>
> On 3/19/19 10:59 AM, Chris Travers wrote:
> >
> >
> > Not discussing whether any particular committer should pick this
> up but
> > I want to discuss an important use case we have at Adjust for this
> sort
> > of patch.
> >
> > The PostgreSQL compression strategy is something we find
> inadequate for
> > at least one of our large deployments (a large debug log spanning
> > 10PB+). Our current solution is to set storage so that it does not
> > compress and then run on ZFS to get compression speedups on
> spinning disks.
> >
> > But running PostgreSQL on ZFS has some annoying costs because we have
> > copy-on-write on copy-on-write, and when you add file
> fragmentation... I
> > would really like to be able to get away from having to do ZFS as an
> > underlying filesystem. While we have good write throughput, read
> > throughput is not as good as I would like.
> >
> > An approach that would give us better row-level compression would
> allow
> > us to ditch the COW filesystem under PostgreSQL approach.
> >
> > So I think the benefits are actually quite high particularly for those
> > dealing with volume/variety problems where things like JSONB might
> be a
> > go-to solution. Similarly I could totally see having systems which
> > handle large amounts of specialized text having extensions for dealing
> > with these.
> >
>
> Sure, I don't disagree - the proposed compression approach may be a big
> win for some deployments further down the road, no doubt about it. But
> as I said, it's unclear when we get there (or if the interesting stuff
> will be in some sort of extension, which I don't oppose in principle).
>
>
> I would assume that if extensions are particularly stable and useful
> they could be moved into core.
>
> But I would also assume that at first, this area would be sufficiently
> experimental that folks (like us) would write our own extensions for it.
>
>
>
> >
> > But hey, I think there are committers working for postgrespro,
> who might
> > have the motivation to get this over the line. Of course,
> assuming that
> > there are no serious objections to having this functionality
> or how it's
> > implemented ... But I don't think that was the case.
> >
> >
> > While I am not currently able to speak for questions of how it is
> > implemented, I can say with very little doubt that we would almost
> > certainly use this functionality if it were there and I could see
> plenty
> > of other cases where this would be a very appropriate direction
> for some
> > other projects as well.
> >
> Well, I guess the best thing you can do to move this patch forward is to
> actually try that on your real-world use case, and report your results
> and possibly do a review of the patch.
>
>
> Yeah, I expect to do this within the next month or two.
>
>
>
> IIRC there was an extension [1] leveraging this custom compression
> interface for better jsonb compression, so perhaps that would work for
> you (not sure if it's up to date with the current patch, though).
>
> [1]
>
> https://www.postgresql.org/message-id/20171130182009.1b492eb2%40wp.localdomain
>
> Yeah I will be looking at a couple different approaches here and
> reporting back. I don't expect it will be a full production workload but
> I do expect to be able to report on benchmarks in both storage and
> performance.
>
FWIW I was a bit curious how would that jsonb compression affect the
data set I'm using for testing jsonpath patches, so I spent a bit of
time getting it to work with master. It attached patch gets it to
compile, but unfortunately then it fails like this:
ERROR: jsonbd: worker has detached
It seems there's some bug in how sh_mq is used, but I don't have time
investigate that further.
regards
--
Tomas Vondra http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/jsonbd.c b/jsonbd.c
index 511cfcf..9b86e1f 100644
--- a/jsonbd.c
+++ b/jsonbd.c
@@ -766,11 +766,6 @@ jsonbd_cminitstate(Oid acoid, List *options)
return NULL;
}
-static void
-jsonbd_cmdrop(Oid acoid)
-{
- /* TODO: if there is no compression options, remove the dictionary */
-}
static struct varlena *
jsonbd_cmdecompress(CompressionAmOptions *cmoptions, const struct varlena *data)
@@ -863,7 +858,6 @@ jsonbd_compression_handler(PG_FUNCTION_ARGS)
CompressionAmRoutine *routine = makeNode(CompressionAmRoutine);
routine->cmcheck = jsonbd_cmcheck;
- routine->cmdrop = jsonbd_cmdrop; /* no drop behavior */
routine->cminitstate = jsonbd_cminitstate;
routine->cmcompress = jsonbd_cmcompress;
routine->cmdecompress = jsonbd_cmdecompress;
diff --git a/jsonbd_utils.c b/jsonbd_utils.c
index 3ab9347..029e2e3 100644
--- a/jsonbd_utils.c
+++ b/jsonbd_utils.c
@@ -2,9 +2,11 @@
#include "jsonbd_utils.h"
#include "postgres.h"
+#include "access/genam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "access/sysattr.h"
+#include "access/table.h"
#include "catalog/indexing.h"
#include "catalog/pg_extension.h"
#include "commands/extension.h"
@@ -13,22 +15,20 @@
#include "utils/fmgroids.h"
#include "utils/rel.h"
-#if PG_VERSION_NUM == 110000
struct shm_mq_alt
{
slock_t mq_mutex;
- PGPROC *mq_receiver; /* we need this one */
- PGPROC *mq_sender; /* this one */
- uint64 mq_bytes_read;
- uint64 mq_bytes_written;
+ PGPROC *mq_receiver;
+ PGPROC *mq_sender;
+ pg_atomic_uint64 mq_bytes_read;
+ pg_atomic_uint64 mq_bytes_written;
Size mq_ring_size;
- bool mq_detached; /* and this one */
+ bool mq_detached;
+ uint8 mq_ring_offset;
+ char mq_ring[FLEXIBLE_ARRAY_MEMBER];
/* in postgres version there are more attributes, but we don't need them */
};
-#else
-#error "shm_mq struct in jsonbd is copied from PostgreSQL 11, please correct it according to your version"
-#endif
/**
* Get 32-bit Murmur3 hash. Ported from qLibc library.
@@ -153,7 +153,7 @@ get_jsonbd_schema(void)
return InvalidOid; /* exit if pg_pathman does not exist */
ScanKeyInit(&entry[0],
- ObjectIdAttributeNumber,
+ Anum_pg_extension_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(ext_oid));
diff --git a/jsonbd_worker.c b/jsonbd_worker.c
index e9cb5b6..c59e102 100644
--- a/jsonbd_worker.c
+++ b/jsonbd_worker.c
@@ -13,11 +13,11 @@
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
-#include "catalog/pg_compression_opt.h"
#include "catalog/pg_extension.h"
#include "commands/extension.h"
#include "commands/dbcommands.h"
#include "executor/spi.h"
+#include "access/tableam.h"
#include "port/atomics.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@@ -32,7 +32,6 @@
#include "utils/rel.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
-#include "utils/tqual.h"
#include "utils/syscache.h"
static bool xact_started = false;
@@ -125,7 +124,7 @@ init_worker(dsm_segment *seg)
worker_args = (jsonbd_worker_args *) dsm_segment_address(seg);
/* Connect to our database */
- BackgroundWorkerInitializeConnectionByOid(worker_args->dboid, InvalidOid);
+ BackgroundWorkerInitializeConnectionByOid(worker_args->dboid, InvalidOid, 0);
worker_state = shm_toc_lookup(toc, worker_args->worker_num, false);
worker_state->proc = MyProc;
@@ -205,8 +204,8 @@ jsonbd_get_key(Relation rel, Relation indrel, Oid cmoptoid, uint32 key_id)
IndexScanDesc scan;
ScanKeyData skey[2];
Datum key_datum;
- HeapTuple tup;
bool isNull;
+ TupleTableSlot *slot;
MemoryContext old_mcxt;
char *res;
@@ -224,19 +223,22 @@ jsonbd_get_key(Relation rel, Relation indrel, Oid cmoptoid, uint32 key_id)
scan = index_beginscan(rel, indrel, SnapshotAny, 2, 0);
index_rescan(scan, skey, 2, NULL, 0);
- tup = index_getnext(scan, ForwardScanDirection);
- if (tup == NULL)
+ slot = table_slot_create(rel, NULL);
+
+ if (!index_getnext_slot(scan, ForwardScanDirection, slot))
elog(ERROR, "key not found for cmopt=%d and id=%d", cmoptoid, key_id);
- key_datum = heap_getattr(tup, JSONBD_DICTIONARY_REL_ATT_KEY,
- RelationGetDescr(rel), &isNull);
+ key_datum = slot_getattr(slot, JSONBD_DICTIONARY_REL_ATT_KEY, &isNull);
Assert(!isNull);
old_mcxt = MemoryContextSwitchTo(worker_context);
res = TextDatumGetCString(key_datum);
MemoryContextSwitchTo(old_mcxt);
+ ExecDropSingleTupleTableSlot(slot);
+
index_endscan(scan);
+
return res;
}
@@ -308,7 +310,7 @@ jsonbd_get_key_id(Relation rel, Relation indrel, Oid cmoptoid, char *key)
{
IndexScanDesc scan;
ScanKeyData skey[2];
- HeapTuple tup;
+ TupleTableSlot *slot;
bool isNull;
uint32 result = 0;
@@ -326,16 +328,18 @@ jsonbd_get_key_id(Relation rel, Relation indrel, Oid cmoptoid, char *key)
scan = index_beginscan(rel, indrel, SnapshotAny, 2, 0);
index_rescan(scan, skey, 2, NULL, 0);
- tup = index_getnext(scan, ForwardScanDirection);
- if (tup != NULL)
+ slot = table_slot_create(rel, NULL);
+
+ if (index_getnext_slot(scan, ForwardScanDirection, slot))
{
- Datum dat = heap_getattr(tup, JSONBD_DICTIONARY_REL_ATT_ID,
- RelationGetDescr(rel), &isNull);
+ Datum dat = slot_getattr(slot, JSONBD_DICTIONARY_REL_ATT_ID, &isNull);
Assert(!isNull);
result = DatumGetInt32(dat);
}
index_endscan(scan);
+ ExecDropSingleTupleTableSlot(slot);
+
return result;
}
@@ -547,7 +551,7 @@ jsonbd_launcher_main(Datum arg)
BackgroundWorkerUnblockSignals();
/* Init this launcher as backend so workers can notify it */
- InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
+ InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL, false);
/* Create resource owner */
CurrentResourceOwner = ResourceOwnerCreate(NULL, "jsonbd_launcher_main");
@@ -821,10 +825,10 @@ jsonbd_register_worker(int worker_num, Oid dboid, int database_num)
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = 0;
worker.bgw_notify_pid = MyProcPid;
- memcpy(worker.bgw_library_name, "jsonbd", BGW_MAXLEN);
- memcpy(worker.bgw_function_name, CppAsString(jsonbd_worker_main), BGW_MAXLEN);
- snprintf(worker.bgw_name, BGW_MAXLEN, "jsonbd, worker %d, db: %d",
- worker_num, dboid);
+ strcpy(worker.bgw_library_name, "jsonbd");
+ strcpy(worker.bgw_function_name, "jsonbd_worker_main");
+ strcpy(worker.bgw_name, "jsonbd worker");
+ strcpy(worker.bgw_type, "jsonbd worker");
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
/* Start dynamic worker */
@@ -835,11 +839,7 @@ jsonbd_register_worker(int worker_num, Oid dboid, int database_num)
}
/* Wait to be signalled. */
-#if PG_VERSION_NUM >= 100000
- WaitLatch(&hdr->launcher_latch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);
-#else
- WaitLatch(&hdr->launcher_latch, WL_LATCH_SET, 0);
-#endif
+ WaitLatch(&hdr->launcher_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, PG_WAIT_EXTENSION);
ResetLatch(&hdr->launcher_latch);
@@ -892,14 +892,14 @@ jsonbd_get_dictionary_relid(void)
Assert(relid != InvalidOid);
- rel = relation_open(relid, NoLock);
+ rel = relation_open(relid, AccessShareLock);
indexes = RelationGetIndexList(rel);
Assert(list_length(indexes) == 2);
foreach(lc, indexes)
{
Oid indOid = lfirst_oid(lc);
- Relation indRel = index_open(indOid, NoLock);
+ Relation indRel = index_open(indOid, AccessShareLock);
int attnum = indRel->rd_index->indkey.values[1];
if (attnum == JSONBD_DICTIONARY_REL_ATT_ID)
@@ -910,9 +910,9 @@ jsonbd_get_dictionary_relid(void)
jsonbd_keys_indoid = indOid;
}
- index_close(indRel, NoLock);
+ index_close(indRel, AccessShareLock);
}
- relation_close(rel, NoLock);
+ relation_close(rel, AccessShareLock);
}
finish_xact_command();