Author: cito
Date: Mon Jan 4 17:51:09 2016
New Revision: 693
Log:
Support copy_from() and copy_to() in pgdb
Modified:
trunk/docs/changelog.rst
trunk/docs/pgdb.rst
trunk/module/pgdb.py
trunk/module/pgmodule.c
Modified: trunk/docs/changelog.rst
==============================================================================
--- trunk/docs/changelog.rst Sun Jan 3 09:11:21 2016 (r692)
+++ trunk/docs/changelog.rst Mon Jan 4 17:51:09 2016 (r693)
@@ -16,6 +16,11 @@
constructor functions, this should not cause any incompatibilities.
- The DB-API 2 module now supports the callproc() cursor method. Note
that output parameters are currently not replaced in the return value.
+- The DB-API 2 module no supports copy operations between data streams
+ on the client and database tables via the COPY command of PostgreSQL.
+ The cursor method copy_from() can be used to copy data from the database
+ to the client, and the cursor method copy_to() can be used to copy data
+ from the client to the database.
- The 7-tuples returned by the description attribute of a pgdb cursor
are now named tuples, i.e. their elements can be also accessed by name.
- The tty parameter and attribute of database connections has been
Modified: trunk/docs/pgdb.rst
==============================================================================
--- trunk/docs/pgdb.rst Sun Jan 3 09:11:21 2016 (r692)
+++ trunk/docs/pgdb.rst Mon Jan 4 17:51:09 2016 (r693)
@@ -267,9 +267,10 @@
This read-only attribute specifies the number of rows that the last
:meth:`Cursor.execute` or :meth:`Cursor.executemany` call produced
(for DQL statements like SELECT) or affected (for DML statements like
- UPDATE or INSERT ). The attribute is -1 in case no such method call has
- been performed on the cursor or the rowcount of the last operation
- cannot be determined by the interface.
+ UPDATE or INSERT). It is also set by the :meth:`Cursor.copy_from` and
+ :meth':`Cursor.copy_to` methods. The attribute is -1 in case no such
+ method call has been performed on the cursor or the rowcount of the
+ last operation cannot be determined by the interface.
close -- close the cursor
-------------------------
@@ -436,6 +437,83 @@
The following methods and attributes are not part of the DB-API 2 standard.
+.. method:: Cursor.copy_from(stream, table, [format], [sep], [null], [size],
[columns])
+
+ Copy data from an input stream to the specified table
+
+ :param stream: the input stream
+ (must be a file-like object, a string or an iterable returning strings)
+ :param str table: the name of a database table
+ :param str format: the format of the data in the input stream,
+ can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
+ :param str sep: a single character separator
+ (the default is ``'\t'`` for text and ``','`` for csv)
+ :param str null: the textual representation of the ``NULL`` value,
+ can also be an empty string (the default is ``'\\N'``)
+ :param int size: the size of the buffer when reading file-like objects
+ :param list column: an optional list of column names
+ :returns: the cursor, so you can chain commands
+
+ :raises TypeError: parameters with wrong types
+ :raises ValueError: invalid parameters
+ :raises IOError: error when executing the copy operation
+
+This method can be used to copy data from an input stream on the client side
+to a database table on the server side using the ``COPY FROM`` command.
+The input stream can be provided in form of a file-like object (which must
+have a ``read()`` method), a string, or an iterable returning one row or
+multiple rows of input data on each iteration.
+
+The format must be text, csv or binary. The sep option sets the column
+separator (delimiter) used in the non binary formats. The null option sets
+the textual representation of ``NULL`` in the input.
+
+The size option sets the size of the buffer used when reading data from
+file-like objects.
+
+The copy operation can be restricted to a subset of columns. If no columns are
+specified, all of them will be copied.
+
+.. method:: Cursor.copy_to(stream, table, [format], [sep], [null], [decode],
[columns])
+
+ Copy data from the specified table to an output stream
+
+ :param stream: the output stream (must be a file-like object or ``None``)
+ :param str table: the name of a database table or a ``SELECT`` query
+ :param str format: the format of the data in the input stream,
+ can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
+ :param str sep: a single character separator
+ (the default is ``'\t'`` for text and ``','`` for csv)
+ :param str null: the textual representation of the ``NULL`` value,
+ can also be an empty string (the default is ``'\\N'``)
+ :param bool decode: whether decoded strings shall be returned
+ for non-binary formats (the default is True in Python 3)
+ :param list column: an optional list of column names
+ :returns: a generator if stream is set to ``None``, otherwise the cursor
+
+ :raises TypeError: parameters with wrong types
+ :raises ValueError: invalid parameters
+ :raises IOError: error when executing the copy operation
+
+This method can be used to copy data from a database table on the server side
+to an output stream on the client side using the ``COPY TO`` command.
+
+The output stream can be provided in form of a file-like object (which must
+have a ``write()`` method). Alternatively, if ``None`` is passed as the
+output stream, the method will return a generator yielding one row of output
+data on each iteration.
+
+Output will be returned as byte strings unless you set decode to true.
+
+Note that you can also use a ``SELECT`` query instead of the table name.
+
+The format must be text, csv or binary. The sep option sets the column
+separator (delimiter) used in the non binary formats. The null option sets
+the textual representation of ``NULL`` in the output.
+
+The copy operation can be restricted to a subset of columns. If no columns are
+specified, all of them will be copied.
+
.. method:: Cursor.row_factory(row)
Process rows before they are returned
Modified: trunk/module/pgdb.py
==============================================================================
--- trunk/module/pgdb.py Sun Jan 3 09:11:21 2016 (r692)
+++ trunk/module/pgdb.py Mon Jan 4 17:51:09 2016 (r693)
@@ -89,6 +89,7 @@
except NameError: # Python >= 3.0
basestring = (str, bytes)
+from collections import Iterable
try:
from collections import OrderedDict
except ImportError: # Python 2.6 or 3.0
@@ -432,6 +433,243 @@
self.execute(query, parameters)
return parameters
+ def copy_from(self, stream, table,
+ format=None, sep=None, null=None, size=None, columns=None):
+ """Copy data from an input stream to the specified table.
+
+ The input stream can be a file-like object with a read() method or
+ it can also be an iterable returning a row or multiple rows of input
+ on each iteration.
+
+ The format must be text, csv or binary. The sep option sets the
+ column separator (delimiter) used in the non binary formats.
+ The null option sets the textual representation of NULL in the input.
+
+ The size option sets the size of the buffer used when reading data
+ from file-like objects.
+
+ The copy operation can be restricted to a subset of columns. If no
+ columns are specified, all of them will be copied.
+
+ """
+ binary_format = format == 'binary'
+ try:
+ read = stream.read
+ except AttributeError:
+ if size:
+ raise ValueError("size must only be set for file-like objects")
+ if binary_format:
+ input_type = bytes
+ type_name = 'byte strings'
+ else:
+ input_type = basestring
+ type_name = 'strings'
+
+ if isinstance(stream, basestring):
+ if not isinstance(stream, input_type):
+ raise ValueError("the input must be %s" % type_name)
+ if not binary_format:
+ if isinstance(stream, str):
+ if not stream.endswith('\n'):
+ stream += '\n'
+ else:
+ if not stream.endswith(b'\n'):
+ stream += b'\n'
+
+ def chunks():
+ yield stream
+
+ elif isinstance(stream, Iterable):
+
+ def chunks():
+ for chunk in stream:
+ if not isinstance(chunk, input_type):
+ raise ValueError(
+ "input stream must consist of %s" % type_name)
+ if isinstance(chunk, str):
+ if not chunk.endswith('\n'):
+ chunk += '\n'
+ else:
+ if not chunk.endswith(b'\n'):
+ chunk += b'\n'
+ yield chunk
+
+ else:
+ raise TypeError("need an input stream to copy from")
+ else:
+ if size is None:
+ size = 8192
+ if size > 0:
+ if not isinstance(size, int):
+ raise TypeError("the size option must be an integer")
+
+ def chunks():
+ while True:
+ buffer = read(size)
+ yield buffer
+ if not buffer or len(buffer) < size:
+ break
+
+ else:
+
+ def chunks():
+ yield read()
+
+ if not table or not isinstance(table, basestring):
+ raise TypeError("need a table to copy to")
+ if table.lower().startswith('select'):
+ raise ValueError("must specify a table, not a query")
+ else:
+ table = '"%s"' % (table,)
+ operation = ['copy %s' % (table,)]
+ options = []
+ params = []
+ if format is not None:
+ if not isinstance(format, basestring):
+ raise TypeError("the format options be a string")
+ if format not in ('text', 'csv', 'binary'):
+ raise ValueError("invalid format")
+ options.append('format %s' % (format,))
+ if sep is not None:
+ if not isinstance(sep, basestring):
+ raise TypeError("the sep option must be a string")
+ if format == 'binary':
+ raise ValueError("sep is not allowed with binary format")
+ if len(sep) != 1:
+ raise ValueError("sep must be a single one-byte character")
+ options.append('delimiter %s')
+ params.append(sep)
+ if null is not None:
+ if not isinstance(null, basestring):
+ raise TypeError("the null option must be a string")
+ options.append('null %s')
+ params.append(null)
+ if columns:
+ if not isinstance(columns, basestring):
+ columns = ','.join('"%s"' % (col,) for col in columns)
+ operation.append('(%s)' % (columns,))
+ operation.append("from stdin")
+ if options:
+ operation.append('(%s)' % ','.join(options))
+ operation = ' '.join(operation)
+
+ putdata = self._src.putdata
+ self.execute(operation, params)
+
+ try:
+ for chunk in chunks():
+ putdata(chunk)
+ except BaseException as error:
+ self.rowcount = -1
+ # the following call will re-raise the error
+ putdata(error)
+ else:
+ self.rowcount = putdata(None)
+
+ # return the cursor object, so you can chain operations
+ return self
+
+ def copy_to(self, stream, table,
+ format=None, sep=None, null=None, decode=None, columns=None):
+ """Copy data from the specified table to an output stream.
+
+ The output stream can be a file-like object with a write() method or
+ it can also be None, in which case the method will return a generator
+ yielding a row on each iteration.
+
+ Output will be returned as byte strings unless you set decode to true.
+
+ Note that you can also use a select query instead of the table name.
+
+ The format must be text, csv or binary. The sep option sets the
+ column separator (delimiter) used in the non binary formats.
+ The null option sets the textual representation of NULL in the output.
+
+ The copy operation can be restricted to a subset of columns. If no
+ columns are specified, all of them will be copied.
+
+ """
+ binary_format = format == 'binary'
+ if stream is not None:
+ try:
+ write = stream.write
+ except AttributeError:
+ raise TypeError("need an output stream to copy to")
+ if not table or not isinstance(table, basestring):
+ raise TypeError("need a table to copy to")
+ if table.lower().startswith('select'):
+ if columns:
+ raise ValueError("columns must be specified in the query")
+ table = '(%s)' % (table,)
+ else:
+ table = '"%s"' % (table,)
+ operation = ['copy %s' % (table,)]
+ options = []
+ params = []
+ if format is not None:
+ if not isinstance(format, basestring):
+ raise TypeError("the format options be a string")
+ if format not in ('text', 'csv', 'binary'):
+ raise ValueError("invalid format")
+ options.append('format %s' % (format,))
+ if sep is not None:
+ if not isinstance(sep, basestring):
+ raise TypeError("the sep option must be a string")
+ if binary_format:
+ raise ValueError("sep is not allowed with binary format")
+ if len(sep) != 1:
+ raise ValueError("sep must be a single one-byte character")
+ options.append('delimiter %s')
+ params.append(sep)
+ if null is not None:
+ if not isinstance(null, basestring):
+ raise TypeError("the null option must be a string")
+ options.append('null %s')
+ params.append(null)
+ if decode is None:
+ if format == 'binary':
+ decode = False
+ else:
+ decode = str is unicode
+ else:
+ if not isinstance(decode, (int, bool)):
+ raise TypeError("the decode option must be a boolean")
+ if decode and binary_format:
+ raise ValueError("decode is not allowed with binary format")
+ if columns:
+ if not isinstance(columns, basestring):
+ columns = ','.join('"%s"' % (col,) for col in columns)
+ operation.append('(%s)' % (columns,))
+
+ operation.append("to stdout")
+ if options:
+ operation.append('(%s)' % ','.join(options))
+ operation = ' '.join(operation)
+
+ getdata = self._src.getdata
+ self.execute(operation, params)
+
+ def copy():
+ while True:
+ row = getdata(decode)
+ if isinstance(row, int):
+ if self.rowcount != row:
+ self.rowcount = row
+ break
+ self.rowcount += 1
+ yield row
+
+ if stream is None:
+ # no input stream, return the generator
+ return copy()
+
+ # write the rows to the file-like input stream
+ for row in copy():
+ write(row)
+
+ # return the cursor object, so you can chain operations
+ return self
+
def __next__(self):
"""Return the next row (support for the iteration protocol)."""
res = self.fetchone()
Modified: trunk/module/pgmodule.c
==============================================================================
--- trunk/module/pgmodule.c Sun Jan 3 09:11:21 2016 (r692)
+++ trunk/module/pgmodule.c Mon Jan 4 17:51:09 2016 (r693)
@@ -2859,6 +2859,216 @@
return pgsource_move(self, args, QUERY_MOVEPREV);
}
+/* put copy data */
+static char sourcePutData__doc__[] =
+"getdata(buffer) -- send data to server during copy from stdin.";
+
+static PyObject *
+sourcePutData(sourceObject *self, PyObject *args)
+{
+ PyObject *buffer_obj; /* the buffer object that was passed in */
+ char *buffer; /* the buffer as encoded string */
+ Py_ssize_t nbytes; /* length of string */
+ char *errormsg = NULL; /* error message */
+ int res; /* direct result of the operation */
+ PyObject *ret; /* return value */
+
+ /* checks validity */
+ if (!check_source_obj(self, CHECK_CNX))
+ return NULL;
+
+ /* make sure that the connection object is valid */
+ if (!self->pgcnx->cnx)
+ return NULL;
+
+ if (!PyArg_ParseTuple(args, "O", &buffer_obj))
+ return NULL;
+
+ if (buffer_obj == Py_None) {
+ /* pass None for terminating the operation */
+ buffer = errormsg = NULL;
+ buffer_obj = NULL;
+ }
+ else if (PyBytes_Check(buffer_obj))
+ {
+ /* or pass a byte string */
+ PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
+ buffer_obj = NULL;
+ }
+ else if (PyUnicode_Check(buffer_obj))
+ {
+ /* or pass a unicode string */
+ buffer_obj = get_encoded_string(
+ buffer_obj, PQclientEncoding(self->pgcnx->cnx));
+ if (!buffer_obj) return NULL; /* pass the UnicodeEncodeError */
+ PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
+ }
+ else if (PyErr_GivenExceptionMatches(buffer_obj, PyExc_BaseException))
+ {
+ /* or pass a Python exception for sending an error message */
+ buffer_obj = PyObject_Str(buffer_obj);
+ if (PyUnicode_Check(buffer_obj))
+ {
+ PyObject *obj = buffer_obj;
+ buffer_obj = get_encoded_string(
+ obj, PQclientEncoding(self->pgcnx->cnx));
+ Py_DECREF(obj);
+ if (!buffer_obj) return NULL; /* pass the
UnicodeEncodeError */
+ }
+ errormsg = PyBytes_AsString(buffer_obj);
+ buffer = NULL;
+ }
+ else
+ {
+ PyErr_SetString(PyExc_TypeError,
+ "putdata() expects a buffer, None or an exception.");
+ return NULL;
+ }
+
+ /* checks validity */
+ if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
+ !self->pgcnx->cnx ||
+ PQresultStatus(self->result) != PGRES_COPY_IN)
+ {
+ PyErr_SetString(PyExc_IOError,
+ "connection is invalid or not in copy_in state.");
+ Py_XDECREF(buffer_obj);
+ return NULL;
+ }
+
+ if (buffer)
+ {
+ res = nbytes ? PQputCopyData(self->pgcnx->cnx, buffer, nbytes)
: 1;
+ }
+ else
+ {
+ res = PQputCopyEnd(self->pgcnx->cnx, errormsg);
+ }
+
+ Py_XDECREF(buffer_obj);
+
+ if (res != 1)
+ {
+ PyErr_SetString(PyExc_IOError,
PQerrorMessage(self->pgcnx->cnx));
+ return NULL;
+ }
+
+ if (buffer) /* buffer has been sent */
+ {
+ ret = Py_None;
+ Py_INCREF(ret);
+ }
+ else /* copy is done */
+ {
+ PGresult *result; /* final result of the operation */
+
+ Py_BEGIN_ALLOW_THREADS;
+ result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS;
+
+ if (PQresultStatus(result) == PGRES_COMMAND_OK)
+ {
+ char *temp;
+ long num_rows;
+
+ temp = PQcmdTuples(result);
+ num_rows = temp[0] ? atol(temp) : -1;
+ ret = PyInt_FromLong(num_rows);
+ }
+ else
+ {
+ if (!errormsg) errormsg =
PQerrorMessage(self->pgcnx->cnx);
+ PyErr_SetString(PyExc_IOError, errormsg);
+ ret = NULL;
+ }
+
+ PQclear(self->result);
+ self->result = NULL;
+ self->result_type = RESULT_EMPTY;
+ }
+
+ return ret; /* None or number of rows */
+}
+
+/* get copy data */
+static char sourceGetData__doc__[] =
+"getdata(decode) -- receive data to server during copy to stdout.";
+
+static PyObject *
+sourceGetData(sourceObject *self, PyObject *args)
+{
+ int *decode = 0; /* decode flag */
+ char *buffer; /* the copied buffer as encoded byte string */
+ Py_ssize_t nbytes; /* length of the byte string */
+ PyObject *ret; /* return value */
+
+ /* checks validity */
+ if (!check_source_obj(self, CHECK_CNX))
+ return NULL;
+
+ /* make sure that the connection object is valid */
+ if (!self->pgcnx->cnx)
+ return NULL;
+
+ if (!PyArg_ParseTuple(args, "|i", &decode))
+ return NULL;
+
+ /* checks validity */
+ if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
+ !self->pgcnx->cnx ||
+ PQresultStatus(self->result) != PGRES_COPY_OUT)
+ {
+ PyErr_SetString(PyExc_IOError,
+ "connection is invalid or not in copy_out state.");
+ return NULL;
+ }
+
+ nbytes = PQgetCopyData(self->pgcnx->cnx, &buffer, 0);
+
+ if (!nbytes || nbytes < -1) /* an error occurred */
+ {
+ PyErr_SetString(PyExc_IOError,
PQerrorMessage(self->pgcnx->cnx));
+ return NULL;
+ }
+
+ if (nbytes == -1) /* copy is done */
+ {
+ PGresult *result; /* final result of the operation */
+
+ Py_BEGIN_ALLOW_THREADS;
+ result = PQgetResult(self->pgcnx->cnx);
+ Py_END_ALLOW_THREADS;
+
+ if (PQresultStatus(result) == PGRES_COMMAND_OK)
+ {
+ char *temp;
+ long num_rows;
+
+ temp = PQcmdTuples(result);
+ num_rows = temp[0] ? atol(temp) : -1;
+ ret = PyInt_FromLong(num_rows);
+ }
+ else
+ {
+ PyErr_SetString(PyExc_IOError,
PQerrorMessage(self->pgcnx->cnx));
+ ret = NULL;
+ }
+
+ PQclear(self->result);
+ self->result = NULL;
+ self->result_type = RESULT_EMPTY;
+ }
+ else /* a row has been returned */
+ {
+ ret = decode ? get_decoded_string(
+ buffer, nbytes,
PQclientEncoding(self->pgcnx->cnx)) :
+ PyBytes_FromStringAndSize(buffer, nbytes);
+ PQfreemem(buffer);
+ }
+
+ return ret; /* buffer or number of rows */
+}
+
/* finds field number from string/integer (internal use only) */
static int
sourceFieldindex(sourceObject *self, PyObject *param, const char *usage)
@@ -3041,6 +3251,10 @@
sourceMoveNext__doc__},
{"moveprev", (PyCFunction) sourceMovePrev, METH_VARARGS,
sourceMovePrev__doc__},
+ {"putdata", (PyCFunction) sourcePutData, METH_VARARGS,
+ sourcePutData__doc__},
+ {"getdata", (PyCFunction) sourceGetData, METH_VARARGS,
+ sourceGetData__doc__},
{"field", (PyCFunction) sourceField, METH_VARARGS,
sourceField__doc__},
{"fieldinfo", (PyCFunction) sourceFieldInfo, METH_VARARGS,
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql