Author: cito
Date: Mon Jan  4 17:51:09 2016
New Revision: 693

Log:
Support copy_from() and copy_to() in pgdb

Modified:
   trunk/docs/changelog.rst
   trunk/docs/pgdb.rst
   trunk/module/pgdb.py
   trunk/module/pgmodule.c

Modified: trunk/docs/changelog.rst
==============================================================================
--- trunk/docs/changelog.rst    Sun Jan  3 09:11:21 2016        (r692)
+++ trunk/docs/changelog.rst    Mon Jan  4 17:51:09 2016        (r693)
@@ -16,6 +16,11 @@
   constructor functions, this should not cause any incompatibilities.
 - The DB-API 2 module now supports the callproc() cursor method. Note
   that output parameters are currently not replaced in the return value.
+- The DB-API 2 module no supports copy operations between data streams
+  on the client and database tables via the COPY command of PostgreSQL.
+  The cursor method copy_from() can be used to copy data from the database
+  to the client, and the cursor method copy_to() can be used to copy data
+  from the client to the database.
 - The 7-tuples returned by the description attribute of a pgdb cursor
   are now named tuples, i.e. their elements can be also accessed by name.
 - The tty parameter and attribute of database connections has been

Modified: trunk/docs/pgdb.rst
==============================================================================
--- trunk/docs/pgdb.rst Sun Jan  3 09:11:21 2016        (r692)
+++ trunk/docs/pgdb.rst Mon Jan  4 17:51:09 2016        (r693)
@@ -267,9 +267,10 @@
     This read-only attribute specifies the number of rows that the last
     :meth:`Cursor.execute` or :meth:`Cursor.executemany` call produced
     (for DQL statements like SELECT) or affected (for DML statements like
-    UPDATE or INSERT ). The attribute is -1 in case no such method call has
-    been performed on the cursor or the rowcount of the last operation
-    cannot be determined by the interface.
+    UPDATE or INSERT). It is also set by the :meth:`Cursor.copy_from` and
+    :meth':`Cursor.copy_to` methods. The attribute is -1 in case no such
+    method call has been performed on the cursor or the rowcount of the
+    last operation cannot be determined by the interface.
 
 close -- close the cursor
 -------------------------
@@ -436,6 +437,83 @@
 
    The following methods and attributes are not part of the DB-API 2 standard.
 
+.. method:: Cursor.copy_from(stream, table, [format], [sep], [null], [size], 
[columns])
+
+       Copy data from an input stream to the specified table
+
+    :param stream: the input stream
+        (must be a file-like object, a string or an iterable returning strings)
+    :param str table: the name of a database table
+    :param str format: the format of the data in the input stream,
+        can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
+    :param str sep: a single character separator
+        (the default is ``'\t'`` for text and ``','`` for csv)
+    :param str null: the textual representation of the ``NULL`` value,
+        can also be an empty string (the default is ``'\\N'``)
+    :param int size: the size of the buffer when reading file-like objects
+    :param list column: an optional list of column names
+    :returns: the cursor, so you can chain commands
+
+    :raises TypeError: parameters with wrong types
+    :raises ValueError: invalid parameters
+    :raises IOError: error when executing the copy operation
+
+This method can be used to copy data from an input stream on the client side
+to a database table on the server side using the ``COPY FROM`` command.
+The input stream can be provided in form of a file-like object (which must
+have a ``read()`` method), a string, or an iterable returning one row or
+multiple rows of input data on each iteration.
+
+The format must be text, csv or binary. The sep option sets the column
+separator (delimiter) used in the non binary formats. The null option sets
+the textual representation of ``NULL`` in the input.
+
+The size option sets the size of the buffer used when reading data from
+file-like objects.
+
+The copy operation can be restricted to a subset of columns. If no columns are
+specified, all of them will be copied.
+
+.. method:: Cursor.copy_to(stream, table, [format], [sep], [null], [decode], 
[columns])
+
+       Copy data from the specified table to an output stream
+
+    :param stream: the output stream (must be a file-like object or ``None``)
+    :param str table: the name of a database table or a ``SELECT`` query
+    :param str format: the format of the data in the input stream,
+        can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
+    :param str sep: a single character separator
+        (the default is ``'\t'`` for text and ``','`` for csv)
+    :param str null: the textual representation of the ``NULL`` value,
+        can also be an empty string (the default is ``'\\N'``)
+    :param bool decode: whether decoded strings shall be returned
+        for non-binary formats (the default is True in Python 3)
+    :param list column: an optional list of column names
+    :returns: a generator if stream is set to ``None``, otherwise the cursor
+
+    :raises TypeError: parameters with wrong types
+    :raises ValueError: invalid parameters
+    :raises IOError: error when executing the copy operation
+
+This method can be used to copy data from a database table on the server side
+to an output stream on the client side using the ``COPY TO`` command.
+
+The output stream can be provided in form of a file-like object (which must
+have a ``write()`` method). Alternatively, if ``None`` is passed as the
+output stream, the method will return a generator yielding one row of output
+data on each iteration.
+
+Output will be returned as byte strings unless you set decode to true.
+
+Note that you can also use a ``SELECT`` query instead of the table name.
+
+The format must be text, csv or binary. The sep option sets the column
+separator (delimiter) used in the non binary formats. The null option sets
+the textual representation of ``NULL`` in the output.
+
+The copy operation can be restricted to a subset of columns. If no columns are
+specified, all of them will be copied.
+
 .. method:: Cursor.row_factory(row)
 
     Process rows before they are returned

Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py        Sun Jan  3 09:11:21 2016        (r692)
+++ trunk/module/pgdb.py        Mon Jan  4 17:51:09 2016        (r693)
@@ -89,6 +89,7 @@
 except NameError:  # Python >= 3.0
     basestring = (str, bytes)
 
+from collections import Iterable
 try:
     from collections import OrderedDict
 except ImportError:  # Python 2.6 or 3.0
@@ -432,6 +433,243 @@
         self.execute(query, parameters)
         return parameters
 
+    def copy_from(self, stream, table,
+            format=None, sep=None, null=None, size=None, columns=None):
+        """Copy data from an input stream to the specified table.
+
+        The input stream can be a file-like object with a read() method or
+        it can also be an iterable returning a row or multiple rows of input
+        on each iteration.
+
+        The format must be text, csv or binary. The sep option sets the
+        column separator (delimiter) used in the non binary formats.
+        The null option sets the textual representation of NULL in the input.
+
+        The size option sets the size of the buffer used when reading data
+        from file-like objects.
+
+        The copy operation can be restricted to a subset of columns. If no
+        columns are specified, all of them will be copied.
+
+        """
+        binary_format = format == 'binary'
+        try:
+            read = stream.read
+        except AttributeError:
+            if size:
+                raise ValueError("size must only be set for file-like objects")
+            if binary_format:
+                input_type = bytes
+                type_name = 'byte strings'
+            else:
+                input_type = basestring
+                type_name = 'strings'
+
+            if isinstance(stream, basestring):
+                if not isinstance(stream, input_type):
+                    raise ValueError("the input must be %s" % type_name)
+                if not binary_format:
+                    if isinstance(stream, str):
+                        if not stream.endswith('\n'):
+                            stream += '\n'
+                    else:
+                        if not stream.endswith(b'\n'):
+                            stream += b'\n'
+
+                def chunks():
+                    yield stream
+
+            elif isinstance(stream, Iterable):
+
+                def chunks():
+                    for chunk in stream:
+                        if not isinstance(chunk, input_type):
+                            raise ValueError(
+                                "input stream must consist of %s" % type_name)
+                        if isinstance(chunk, str):
+                            if not chunk.endswith('\n'):
+                                chunk += '\n'
+                        else:
+                            if not chunk.endswith(b'\n'):
+                                chunk += b'\n'
+                        yield chunk
+
+            else:
+                raise TypeError("need an input stream to copy from")
+        else:
+            if size is None:
+                size = 8192
+            if size > 0:
+                if not isinstance(size, int):
+                    raise TypeError("the size option must be an integer")
+
+                def chunks():
+                    while True:
+                        buffer = read(size)
+                        yield buffer
+                        if not buffer or len(buffer) < size:
+                            break
+
+            else:
+
+                def chunks():
+                    yield read()
+
+        if not table or not isinstance(table, basestring):
+            raise TypeError("need a table to copy to")
+        if table.lower().startswith('select'):
+                raise ValueError("must specify a table, not a query")
+        else:
+            table = '"%s"' % (table,)
+        operation = ['copy %s' % (table,)]
+        options = []
+        params = []
+        if format is not None:
+            if not isinstance(format, basestring):
+                raise TypeError("the format options be a string")
+            if format not in ('text', 'csv', 'binary'):
+                raise ValueError("invalid format")
+            options.append('format %s' % (format,))
+        if sep is not None:
+            if not isinstance(sep, basestring):
+                raise TypeError("the sep option must be a string")
+            if format == 'binary':
+                raise ValueError("sep is not allowed with binary format")
+            if len(sep) != 1:
+                raise ValueError("sep must be a single one-byte character")
+            options.append('delimiter %s')
+            params.append(sep)
+        if null is not None:
+            if not isinstance(null, basestring):
+                raise TypeError("the null option must be a string")
+            options.append('null %s')
+            params.append(null)
+        if columns:
+            if not isinstance(columns, basestring):
+                columns = ','.join('"%s"' % (col,) for col in columns)
+            operation.append('(%s)' % (columns,))
+        operation.append("from stdin")
+        if options:
+            operation.append('(%s)' % ','.join(options))
+        operation = ' '.join(operation)
+
+        putdata = self._src.putdata
+        self.execute(operation, params)
+
+        try:
+            for chunk in chunks():
+                putdata(chunk)
+        except BaseException as error:
+            self.rowcount = -1
+            # the following call will re-raise the error
+            putdata(error)
+        else:
+            self.rowcount = putdata(None)
+
+        # return the cursor object, so you can chain operations
+        return self
+
+    def copy_to(self, stream, table,
+            format=None, sep=None, null=None, decode=None, columns=None):
+        """Copy data from the specified table to an output stream.
+
+        The output stream can be a file-like object with a write() method or
+        it can also be None, in which case the method will return a generator
+        yielding a row on each iteration.
+
+        Output will be returned as byte strings unless you set decode to true.
+
+        Note that you can also use a select query instead of the table name.
+
+        The format must be text, csv or binary. The sep option sets the
+        column separator (delimiter) used in the non binary formats.
+        The null option sets the textual representation of NULL in the output.
+
+        The copy operation can be restricted to a subset of columns. If no
+        columns are specified, all of them will be copied.
+
+        """
+        binary_format = format == 'binary'
+        if stream is not None:
+            try:
+                write = stream.write
+            except AttributeError:
+                raise TypeError("need an output stream to copy to")
+        if not table or not isinstance(table, basestring):
+            raise TypeError("need a table to copy to")
+        if table.lower().startswith('select'):
+            if columns:
+                raise ValueError("columns must be specified in the query")
+            table = '(%s)' % (table,)
+        else:
+            table = '"%s"' % (table,)
+        operation = ['copy %s' % (table,)]
+        options = []
+        params = []
+        if format is not None:
+            if not isinstance(format, basestring):
+                raise TypeError("the format options be a string")
+            if format not in ('text', 'csv', 'binary'):
+                raise ValueError("invalid format")
+            options.append('format %s' % (format,))
+        if sep is not None:
+            if not isinstance(sep, basestring):
+                raise TypeError("the sep option must be a string")
+            if binary_format:
+                raise ValueError("sep is not allowed with binary format")
+            if len(sep) != 1:
+                raise ValueError("sep must be a single one-byte character")
+            options.append('delimiter %s')
+            params.append(sep)
+        if null is not None:
+            if not isinstance(null, basestring):
+                raise TypeError("the null option must be a string")
+            options.append('null %s')
+            params.append(null)
+        if decode is None:
+            if format == 'binary':
+                decode = False
+            else:
+                decode = str is unicode
+        else:
+            if not isinstance(decode, (int, bool)):
+                raise TypeError("the decode option must be a boolean")
+            if decode and binary_format:
+                raise ValueError("decode is not allowed with binary format")
+        if columns:
+            if not isinstance(columns, basestring):
+                columns = ','.join('"%s"' % (col,) for col in columns)
+            operation.append('(%s)' % (columns,))
+
+        operation.append("to stdout")
+        if options:
+            operation.append('(%s)' % ','.join(options))
+        operation = ' '.join(operation)
+
+        getdata = self._src.getdata
+        self.execute(operation, params)
+
+        def copy():
+            while True:
+                row = getdata(decode)
+                if isinstance(row, int):
+                    if self.rowcount != row:
+                        self.rowcount = row
+                    break
+                self.rowcount += 1
+                yield row
+
+        if stream is None:
+            # no input stream, return the generator
+            return copy()
+
+        # write the rows to the file-like input stream
+        for row in copy():
+            write(row)
+
+        # return the cursor object, so you can chain operations
+        return self
+
     def __next__(self):
         """Return the next row (support for the iteration protocol)."""
         res = self.fetchone()

Modified: trunk/module/pgmodule.c
==============================================================================
--- trunk/module/pgmodule.c     Sun Jan  3 09:11:21 2016        (r692)
+++ trunk/module/pgmodule.c     Mon Jan  4 17:51:09 2016        (r693)
@@ -2859,6 +2859,216 @@
        return pgsource_move(self, args, QUERY_MOVEPREV);
 }
 
+/* put copy data */
+static char sourcePutData__doc__[] =
+"getdata(buffer) -- send data to server during copy from stdin.";
+
+static PyObject *
+sourcePutData(sourceObject *self, PyObject *args)
+{
+       PyObject   *buffer_obj; /* the buffer object that was passed in */
+       char       *buffer; /* the buffer as encoded string */
+       Py_ssize_t      nbytes; /* length of string */
+       char       *errormsg = NULL; /* error message */
+       int                     res; /* direct result of the operation */
+       PyObject   *ret; /* return value */
+
+       /* checks validity */
+       if (!check_source_obj(self, CHECK_CNX))
+               return NULL;
+
+       /* make sure that the connection object is valid */
+       if (!self->pgcnx->cnx)
+               return NULL;
+
+       if (!PyArg_ParseTuple(args, "O", &buffer_obj))
+               return NULL;
+
+       if (buffer_obj == Py_None) {
+               /* pass None for terminating the operation */
+               buffer = errormsg = NULL;
+               buffer_obj = NULL;
+       }
+       else if (PyBytes_Check(buffer_obj))
+       {
+               /* or pass a byte string */
+               PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
+               buffer_obj = NULL;
+       }
+       else if (PyUnicode_Check(buffer_obj))
+       {
+               /* or pass a unicode string */
+               buffer_obj = get_encoded_string(
+                       buffer_obj, PQclientEncoding(self->pgcnx->cnx));
+               if (!buffer_obj) return NULL; /* pass the UnicodeEncodeError */
+               PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
+       }
+       else if (PyErr_GivenExceptionMatches(buffer_obj, PyExc_BaseException))
+       {
+               /* or pass a Python exception for sending an error message */
+               buffer_obj = PyObject_Str(buffer_obj);
+               if (PyUnicode_Check(buffer_obj))
+               {
+                       PyObject *obj = buffer_obj;
+                       buffer_obj = get_encoded_string(
+                               obj, PQclientEncoding(self->pgcnx->cnx));
+                       Py_DECREF(obj);
+                       if (!buffer_obj) return NULL; /* pass the 
UnicodeEncodeError */
+               }
+               errormsg = PyBytes_AsString(buffer_obj);
+               buffer = NULL;
+       }
+       else
+       {
+               PyErr_SetString(PyExc_TypeError,
+                       "putdata() expects a buffer, None or an exception.");
+               return NULL;
+       }
+
+       /* checks validity */
+       if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
+                       !self->pgcnx->cnx ||
+                       PQresultStatus(self->result) != PGRES_COPY_IN)
+       {
+               PyErr_SetString(PyExc_IOError,
+                       "connection is invalid or not in copy_in state.");
+               Py_XDECREF(buffer_obj);
+               return NULL;
+       }
+
+       if (buffer)
+       {
+               res = nbytes ? PQputCopyData(self->pgcnx->cnx, buffer, nbytes) 
: 1;
+       }
+       else
+       {
+               res = PQputCopyEnd(self->pgcnx->cnx, errormsg);
+       }
+
+       Py_XDECREF(buffer_obj);
+
+       if (res != 1)
+       {
+               PyErr_SetString(PyExc_IOError, 
PQerrorMessage(self->pgcnx->cnx));
+               return NULL;
+       }
+
+       if (buffer) /* buffer has been sent */
+       {
+               ret = Py_None;
+               Py_INCREF(ret);
+       }
+       else /* copy is done */
+       {
+               PGresult   *result; /* final result of the operation */
+
+               Py_BEGIN_ALLOW_THREADS;
+               result = PQgetResult(self->pgcnx->cnx);
+               Py_END_ALLOW_THREADS;
+
+               if (PQresultStatus(result) == PGRES_COMMAND_OK)
+               {
+                       char   *temp;
+                       long    num_rows;
+
+                       temp = PQcmdTuples(result);
+                       num_rows = temp[0] ? atol(temp) : -1;
+                       ret = PyInt_FromLong(num_rows);
+               }
+               else
+               {
+                       if (!errormsg) errormsg = 
PQerrorMessage(self->pgcnx->cnx);
+                       PyErr_SetString(PyExc_IOError, errormsg);
+                       ret = NULL;
+               }
+
+               PQclear(self->result);
+               self->result = NULL;
+               self->result_type = RESULT_EMPTY;
+       }
+
+       return ret; /* None or number of rows */
+}
+
+/* get copy data */
+static char sourceGetData__doc__[] =
+"getdata(decode) -- receive data to server during copy to stdout.";
+
+static PyObject *
+sourceGetData(sourceObject *self, PyObject *args)
+{
+       int                *decode = 0; /* decode flag */
+       char       *buffer; /* the copied buffer as encoded byte string */
+       Py_ssize_t      nbytes; /* length of the byte string */
+       PyObject   *ret; /* return value */
+
+       /* checks validity */
+       if (!check_source_obj(self, CHECK_CNX))
+               return NULL;
+
+       /* make sure that the connection object is valid */
+       if (!self->pgcnx->cnx)
+               return NULL;
+
+       if (!PyArg_ParseTuple(args, "|i", &decode))
+               return NULL;
+
+       /* checks validity */
+       if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
+                       !self->pgcnx->cnx ||
+                       PQresultStatus(self->result) != PGRES_COPY_OUT)
+       {
+               PyErr_SetString(PyExc_IOError,
+                       "connection is invalid or not in copy_out state.");
+               return NULL;
+       }
+
+       nbytes = PQgetCopyData(self->pgcnx->cnx, &buffer, 0);
+
+       if (!nbytes || nbytes < -1) /* an error occurred */
+       {
+               PyErr_SetString(PyExc_IOError, 
PQerrorMessage(self->pgcnx->cnx));
+               return NULL;
+       }
+
+       if (nbytes == -1) /* copy is done */
+       {
+               PGresult   *result; /* final result of the operation */
+
+               Py_BEGIN_ALLOW_THREADS;
+               result = PQgetResult(self->pgcnx->cnx);
+               Py_END_ALLOW_THREADS;
+
+               if (PQresultStatus(result) == PGRES_COMMAND_OK)
+               {
+                       char   *temp;
+                       long    num_rows;
+
+                       temp = PQcmdTuples(result);
+                       num_rows = temp[0] ? atol(temp) : -1;
+                       ret = PyInt_FromLong(num_rows);
+               }
+               else
+               {
+                       PyErr_SetString(PyExc_IOError, 
PQerrorMessage(self->pgcnx->cnx));
+                       ret = NULL;
+               }
+
+               PQclear(self->result);
+               self->result = NULL;
+               self->result_type = RESULT_EMPTY;
+       }
+       else /* a row has been returned */
+       {
+               ret = decode ? get_decoded_string(
+                               buffer, nbytes, 
PQclientEncoding(self->pgcnx->cnx)) :
+                       PyBytes_FromStringAndSize(buffer, nbytes);
+               PQfreemem(buffer);
+       }
+
+       return ret; /* buffer or number of rows */
+}
+
 /* finds field number from string/integer (internal use only) */
 static int
 sourceFieldindex(sourceObject *self, PyObject *param, const char *usage)
@@ -3041,6 +3251,10 @@
                        sourceMoveNext__doc__},
        {"moveprev", (PyCFunction) sourceMovePrev, METH_VARARGS,
                        sourceMovePrev__doc__},
+       {"putdata", (PyCFunction) sourcePutData, METH_VARARGS,
+                       sourcePutData__doc__},
+       {"getdata", (PyCFunction) sourceGetData, METH_VARARGS,
+                       sourceGetData__doc__},
        {"field", (PyCFunction) sourceField, METH_VARARGS,
                        sourceField__doc__},
        {"fieldinfo", (PyCFunction) sourceFieldInfo, METH_VARARGS,
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql

Reply via email to