https://www.postgresql.org/docs/devel/libpq-pipeline-mode.html
strace -s999 -fe sendto,recvfrom python3.6 -c "import pg; db=pg.DB('host=/tmp dbname=postgres'); db.set_non_blocking(True); db.enter_pipeline_mode(); q=db.send_query('SELECT generate_series(1,9)'); q2=db.send_query('SELECT generate_series(1,3)'); db.sync_pipeline(); print('poll',db.poll()); print(q.getresult()); print('Should be None:', q.getresult()); print(q.getresult() )" sendto(3, "P\0\0\0#\0SELECT generate_series(1,9)\0\0\0B\0\0\0\f\0\0\0\0\0\0\0\0D\0\0\0\6P\0E\0\0\0\t\0\0\0\0\0P\0\0\0#\0SELECT generate_series(1,3)\0\0\0B\0\0\0\f\0\0\0\0\0\0\0\0D\0\0\0\6P\0E\0\0\0\t\0\0\0\0\0S\0\0\0\4", 137, MSG_NOSIGNAL, NULL, 0) = 137 poll 3 [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)] Should be None: None [(1,), (2,), (3,)] This is a minimal POC patch - please don't merge it. This already seems to do all that we'd want at Telsasoft, for running many INSERT ON CONFLICT. It's a bit weird that the python query objects are the result of a specific query, but in pipeline mode, any query object will read the results of the next (unrelated) query. I'm not sure if pygres should do something higher level beyond just making the C API calls. Like maybe we should wrap send_query(), and keep a stack of result objects and make sure that getresult() is called the right number of times. But I'm afraid anything we add may impede other flexibility, like using single row mode. So maybe any additional interface would be an optional layer on top to be added in the future... diff --git a/pgconn.c b/pgconn.c index 0d6b980..fb74fd0 100644 --- a/pgconn.c +++ b/pgconn.c @@ -1584,6 +1584,37 @@ conn_dir(connObject *self, PyObject *noargs) return attrs; } +// XXX: PQexitPipelineMode PQpipelineStatus +// TODO: rst docs +static char conn_enter_pipeline__doc__[] = +"enter_pipeline() - begin pipeline mode"; + +static PyObject * +conn_enter_pipeline(connObject *self, PyObject *noargs) +{ + Py_BEGIN_ALLOW_THREADS + PQenterPipelineMode(self->cnx); + Py_END_ALLOW_THREADS + + Py_INCREF(Py_None); + return Py_None; +} + +static char conn_sync_pipeline__doc__[] = +"pipeline_sync() - establish a synchronization point in the pipeline\n\n" +"This sends queued commands to the server."; + +static PyObject * +conn_sync_pipeline(connObject *self, PyObject *noargs) +{ + Py_BEGIN_ALLOW_THREADS + PQpipelineSync(self->cnx); + Py_END_ALLOW_THREADS + + Py_INCREF(Py_None); + return Py_None; +} + /* Connection object methods */ static struct PyMethodDef conn_methods[] = { {"__dir__", (PyCFunction) conn_dir, METH_NOARGS, NULL}, @@ -1593,9 +1624,9 @@ static struct PyMethodDef conn_methods[] = { {"query", (PyCFunction) conn_query, METH_VARARGS, conn_query__doc__}, {"send_query", (PyCFunction) conn_send_query, - METH_VARARGS, conn_send_query__doc__}, + METH_VARARGS|METH_KEYWORDS, conn_send_query__doc__}, {"query_prepared", (PyCFunction) conn_query_prepared, - METH_VARARGS, conn_query_prepared__doc__}, + METH_VARARGS|METH_KEYWORDS, conn_query_prepared__doc__}, {"prepare", (PyCFunction) conn_prepare, METH_VARARGS, conn_prepare__doc__}, {"describe_prepared", (PyCFunction) conn_describe_prepared, @@ -1662,6 +1693,13 @@ static struct PyMethodDef conn_methods[] = { METH_VARARGS, conn_loimport__doc__}, #endif /* LARGE_OBJECTS */ +#ifdef PIPELINE_MODE + {"enter_pipeline_mode", (PyCFunction) conn_enter_pipeline, + METH_VARARGS, conn_enter_pipeline__doc__}, + {"sync_pipeline", (PyCFunction) conn_sync_pipeline, + METH_VARARGS, conn_sync_pipeline__doc__}, +#endif /* PIPELINE_MODE */ + {NULL, NULL} /* sentinel */ }; diff --git a/pgquery.c b/pgquery.c index ffa4e87..b709690 100644 --- a/pgquery.c +++ b/pgquery.c @@ -140,7 +140,7 @@ _get_async_result(queryObject *self, int keep) { if (!self->result) { /* end of result set, return None */ Py_DECREF(self->pgcnx); - self->pgcnx = NULL; + // self->pgcnx = NULL; Py_INCREF(Py_None); return Py_None; } diff --git a/setup.py b/setup.py index ed02c40..b825d69 100755 --- a/setup.py +++ b/setup.py @@ -146,6 +146,7 @@ class build_pg_ext(build_ext): self.pqlib_info = None self.ssl_info = None self.memory_size = None + self.pipeline_mode = None supported = pg_version >= (9, 0) if not supported: warnings.warn( @@ -194,6 +195,16 @@ class build_pg_ext(build_ext): warnings.warn( "The installed PostgreSQL version" " does not support the memory size function.") + + wanted = self.pipeline_mode + supported = pg_version >= (14, 0) + if wanted or (wanted is None and supported): + define_macros.append(('PIPELINE_MODE', None)) + if not supported: + warnings.warn( + "The installed PostgreSQL version" + " does not support pipeline mode.") + if sys.platform == 'win32': libraries[0] = 'lib' + libraries[0] if os.path.exists(os.path.join( _______________________________________________ PyGreSQL mailing list PyGreSQL@Vex.Net https://mail.vex.net/mailman/listinfo/pygresql