https://www.postgresql.org/docs/devel/libpq-pipeline-mode.html

strace -s999 -fe sendto,recvfrom python3.6 -c "import pg; db=pg.DB('host=/tmp 
dbname=postgres'); db.set_non_blocking(True); db.enter_pipeline_mode(); 
q=db.send_query('SELECT generate_series(1,9)'); q2=db.send_query('SELECT 
generate_series(1,3)'); db.sync_pipeline(); print('poll',db.poll()); 
print(q.getresult()); print('Should be None:', q.getresult()); 
print(q.getresult() )"

sendto(3, "P\0\0\0#\0SELECT 
generate_series(1,9)\0\0\0B\0\0\0\f\0\0\0\0\0\0\0\0D\0\0\0\6P\0E\0\0\0\t\0\0\0\0\0P\0\0\0#\0SELECT
 
generate_series(1,3)\0\0\0B\0\0\0\f\0\0\0\0\0\0\0\0D\0\0\0\6P\0E\0\0\0\t\0\0\0\0\0S\0\0\0\4",
 137, MSG_NOSIGNAL, NULL, 0) = 137

poll 3
[(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]
Should be None: None
[(1,), (2,), (3,)]

This is a minimal POC patch - please don't merge it.
This already seems to do all that we'd want at Telsasoft, for running many
INSERT ON CONFLICT.

It's a bit weird that the python query objects are the result of a specific
query, but in pipeline mode, any query object will read the results of the next
(unrelated) query.

I'm not sure if pygres should do something higher level beyond just making the
C API calls.  Like maybe we should wrap send_query(), and keep a stack of
result objects and make sure that getresult() is called the right number of
times.  But I'm afraid anything we add may impede other flexibility, like using
single row mode.  So maybe any additional interface would be an optional layer
on top to be added in the future...

diff --git a/pgconn.c b/pgconn.c
index 0d6b980..fb74fd0 100644
--- a/pgconn.c
+++ b/pgconn.c
@@ -1584,6 +1584,37 @@ conn_dir(connObject *self, PyObject *noargs)
     return attrs;
 }
 
+// XXX: PQexitPipelineMode PQpipelineStatus
+// TODO: rst docs
+static char conn_enter_pipeline__doc__[] =
+"enter_pipeline() - begin pipeline mode";
+
+static PyObject *
+conn_enter_pipeline(connObject *self, PyObject *noargs)
+{
+    Py_BEGIN_ALLOW_THREADS
+    PQenterPipelineMode(self->cnx);
+    Py_END_ALLOW_THREADS
+
+    Py_INCREF(Py_None);
+    return Py_None;
+}
+
+static char conn_sync_pipeline__doc__[] =
+"pipeline_sync() - establish a synchronization point in the pipeline\n\n"
+"This sends queued commands to the server.";
+
+static PyObject *
+conn_sync_pipeline(connObject *self, PyObject *noargs)
+{
+    Py_BEGIN_ALLOW_THREADS
+    PQpipelineSync(self->cnx);
+    Py_END_ALLOW_THREADS
+
+    Py_INCREF(Py_None);
+    return Py_None;
+}
+
 /* Connection object methods */
 static struct PyMethodDef conn_methods[] = {
     {"__dir__", (PyCFunction) conn_dir,  METH_NOARGS, NULL},
@@ -1593,9 +1624,9 @@ static struct PyMethodDef conn_methods[] = {
     {"query", (PyCFunction) conn_query,
         METH_VARARGS, conn_query__doc__},
     {"send_query", (PyCFunction) conn_send_query,
-        METH_VARARGS, conn_send_query__doc__},
+        METH_VARARGS|METH_KEYWORDS, conn_send_query__doc__},
     {"query_prepared", (PyCFunction) conn_query_prepared,
-        METH_VARARGS, conn_query_prepared__doc__},
+        METH_VARARGS|METH_KEYWORDS, conn_query_prepared__doc__},
     {"prepare", (PyCFunction) conn_prepare,
         METH_VARARGS, conn_prepare__doc__},
     {"describe_prepared", (PyCFunction) conn_describe_prepared,
@@ -1662,6 +1693,13 @@ static struct PyMethodDef conn_methods[] = {
         METH_VARARGS, conn_loimport__doc__},
 #endif /* LARGE_OBJECTS */
 
+#ifdef PIPELINE_MODE
+    {"enter_pipeline_mode", (PyCFunction) conn_enter_pipeline,
+        METH_VARARGS, conn_enter_pipeline__doc__},
+    {"sync_pipeline", (PyCFunction) conn_sync_pipeline,
+        METH_VARARGS, conn_sync_pipeline__doc__},
+#endif /* PIPELINE_MODE */
+
     {NULL, NULL} /* sentinel */
 };
 
diff --git a/pgquery.c b/pgquery.c
index ffa4e87..b709690 100644
--- a/pgquery.c
+++ b/pgquery.c
@@ -140,7 +140,7 @@ _get_async_result(queryObject *self, int keep) {
         if (!self->result) {
             /* end of result set, return None */
             Py_DECREF(self->pgcnx);
-            self->pgcnx = NULL;
+            // self->pgcnx = NULL;
             Py_INCREF(Py_None);
             return Py_None;
         }
diff --git a/setup.py b/setup.py
index ed02c40..b825d69 100755
--- a/setup.py
+++ b/setup.py
@@ -146,6 +146,7 @@ class build_pg_ext(build_ext):
         self.pqlib_info = None
         self.ssl_info = None
         self.memory_size = None
+        self.pipeline_mode = None
         supported = pg_version >= (9, 0)
         if not supported:
             warnings.warn(
@@ -194,6 +195,16 @@ class build_pg_ext(build_ext):
                 warnings.warn(
                     "The installed PostgreSQL version"
                     " does not support the memory size function.")
+
+        wanted = self.pipeline_mode
+        supported = pg_version >= (14, 0)
+        if wanted or (wanted is None and supported):
+            define_macros.append(('PIPELINE_MODE', None))
+            if not supported:
+                warnings.warn(
+                    "The installed PostgreSQL version"
+                    " does not support pipeline mode.")
+
         if sys.platform == 'win32':
             libraries[0] = 'lib' + libraries[0]
             if os.path.exists(os.path.join(
_______________________________________________
PyGreSQL mailing list
PyGreSQL@Vex.Net
https://mail.vex.net/mailman/listinfo/pygresql

Reply via email to