Author: cito
Date: Sun Jan 31 13:45:58 2016
New Revision: 799
Log:
Improve adaptation and add query_formatted() method
Also added more tests and documentation.
Added:
trunk/docs/contents/pg/adaptation.rst
- copied, changed from r797, trunk/docs/contents/pgdb/adaptation.rst
Modified:
trunk/docs/contents/pg/db_wrapper.rst
trunk/docs/contents/pg/index.rst
trunk/docs/contents/pg/module.rst
trunk/docs/contents/pgdb/adaptation.rst
trunk/docs/contents/pgdb/types.rst
trunk/pg.py
trunk/pgdb.py
trunk/tests/test_classic_dbwrapper.py
trunk/tests/test_dbapi20.py
Copied and modified: trunk/docs/contents/pg/adaptation.rst (from r797,
trunk/docs/contents/pgdb/adaptation.rst)
==============================================================================
--- trunk/docs/contents/pgdb/adaptation.rst Fri Jan 29 17:43:22 2016
(r797, copy source)
+++ trunk/docs/contents/pg/adaptation.rst Sun Jan 31 13:45:58 2016
(r799)
@@ -1,7 +1,7 @@
Remarks on Adaptation and Typecasting
=====================================
-.. py:currentmodule:: pgdb
+.. py:currentmodule:: pg
Both PostgreSQL and Python have the concept of data types, but there
are of course differences between the two type systems. Therefore PyGreSQL
@@ -14,118 +14,132 @@
Adaptation of parameters
------------------------
+When you use the higher level methods of the classic :mod:`pg` module like
+:meth:`DB.insert()` or :meth:`DB.update()`, you don't need to care about
+adaptation of parameters, since all of this is happening automatically behind
+the scenes. You only need to consider this issue when creating SQL commands
+manually and sending them to the database using the :meth:`DB.query` method.
+
+Imagine you have created a user login form that stores the login name as
+*login* and the password as *passwd* and you now want to get the user
+data for that user. You may be tempted to execute a query like this::
+
+ >>> db = pg.DB(...)
+ >>> sql = "SELECT * FROM user_table WHERE login = '%s' AND passwd = '%s'"
+ >>> db.query(sql % (login, passwd)).getresult()[0]
+
+This seems to work at a first glance, but you will notice an error as soon as
+you try to use a login name containing a single quote. Even worse, this error
+can be exploited through a so called "SQL injection", where an attacker inserts
+malicious SQL statements into the query that you never intended to be executed.
+For instance, with a login name something like ``' OR ''='`` the user could
+easily log in and see the user data of another user in the database.
+
+One solution for this problem would be to clean your input from "dangerous"
+characters like the single quote, but this is tedious and it is likely that
+you overlook something or break the application e.g. for users with names
+like "D'Arcy". A better solution is to use the escaping functions provided
+by PostgreSQL which are available as methods on the :class:`DB` object::
+
+ >>> login = "D'Arcy"
+ >>> db.escape_string(login)
+ "D''Arcy"
+
+As you see, :meth:`DB.escape_string` has doubled the single quote which is
+the right thing to do in SQL. However, there are better ways of passing
+parameters to the query, without having to manually escape them. If you
+pass the parameters as positional arguments to :meth:`DB.query`, then
+PyGreSQL will send them to the database separately, without the need for
+quoting them inside the SQL command, and without the problems inherent with
+that process. In this case you must put placeholders of the form ``$1``,
+``$2`` etc. in the SQL command in place of the parameters that should go there.
+For instance::
-PyGreSQL knows how to adapt the common Python types to get a suitable
-representation of their values for PostgreSQL when you pass parameters
-to a query. For example::
-
- >>> con = pgdb.connect(...)
- >>> cur = con.cursor()
- >>> parameters = (144, 3.75, 'hello', None)
- >>> tuple(cur.execute('SELECT %s, %s, %s, %s', parameters).fetchone()
- (144, Decimal('3.75'), 'hello', None)
+ >>> sql = "SELECT * FROM user_table WHERE login = $1 AND passwd = $2"
+ >>> db.query(sql, login, passwd).getresult()[0]
-This is the result we can expect, so obviously PyGreSQL has adapted the
-parameters and sent the following query to PostgreSQL:
-
-.. code-block:: sql
-
- SELECT 144, 3.75, 'hello', NULL
-
-Note the subtle, but important detail that even though the SQL string passed
-to :meth:`cur.execute` contains conversion specifications normally used in
-Python with the ``%`` operator for formatting strings, we didn't use the ``%``
-operator to format the parameters, but passed them as the second argument to
-:meth:`cur.execute`. I.e. we **didn't** write the following::
-
->>> tuple(cur.execute('SELECT %s, %s, %s, %s' % parameters).fetchone()
-
-If we had done this, PostgreSQL would have complained because the parameters
-were not adapted. Particularly, there would be no quotes around the value
-``'hello'``, so PostgreSQL would have interpreted this as a database column,
-which would have caused a :exc:`ProgrammingError`. Also, the Python value
-``None`` would have been included in the SQL command literally, instead of
-being converted to the SQL keyword ``NULL``, which would have been another
-reason for PostgreSQL to complain about our bad query:
-
-.. code-block:: sql
-
- SELECT 144, 3.75, hello, None
-
-Even worse, building queries with the use of the ``%`` operator makes us
-vulnerable to so called "SQL injection" exploits, where an attacker inserts
-malicious SQL statements into our queries that we never intended to be
-executed. We could avoid this by carefully quoting and escaping the
-parameters, but this would be tedious and if we overlook something, our
-code will still be vulnerable. So please don't do this. This cannot be
-emphasized enough, because it is such a subtle difference and using the ``%``
-operator looks so natural:
+That's much better. So please always keep the following warning in mind:
.. warning::
Remember to **never** insert parameters directly into your queries using
the ``%`` operator. Always pass the parameters separately.
-The good thing is that by letting PyGreSQL do the work for you, you can treat
-all your parameters equally and don't need to ponder where you need to put
-quotes or need to escape strings. You can and should also always use the
-general ``%s`` specification instead of e.g. using ``%d`` for integers.
-Actually, to avoid mistakes and make it easier to insert parameters at more
-than one location, you can and should use named specifications, like this::
-
- >>> params = dict(greeting='Hello', name='HAL')
- >>> sql = """SELECT %(greeting)s || ', ' || %(name)s
- ... || '. Do you read me, ' || %(name)s || '?'"""
- >>> cur.execute(sql, params).fetchone()[0]
- 'Hello, HAL. Do you read me, HAL?'
-
-PyGreSQL does not only adapt the basic types like ``int``, ``float``,
-``bool`` and ``str``, but also tries to make sense of Python lists and tuples.
+If you like the ``%`` format specifications of Python better than the
+placeholders used by PostgreSQL, there is still a way to use them, via the
+:meth:`DB.query_formatted` method::
+
+ >>> sql = "SELECT * FROM user_table WHERE login = %s AND passwd = %s"
+ >>> db.query_formatted(sql, (login, passwd)).getresult()[0]
+
+Note that we need to pass the parameters not as positional arguments here,
+but as a single tuple. Also note again that we did not use the ``%``
+operator of Python to format the SQL string, we just used the ``%s`` format
+specifications of Python and let PyGreSQL care about the formatting.
+Even better, you can also pass the parameters as a dictionary if you use
+the :meth:`DB.query_formatted` method::
+
+ >>> sql = """SELECT * FROM user_table
+ ... WHERE login = %(login)s AND passwd = %(passwd)s"""
+ >>> parameters = dict(login=login, passwd=passwd)
+ >>> db.query_formatted(sql, parameters).getresult()[0]
+
+Here is another example::
+
+ >>> sql = "SELECT 'Hello, ' || %s || '!'"
+ >>> db.query_formatted(sql, (login,)).getresult()[0]
+
+You would think that the following even simpler example should work, too:
+
+ >>> sql = "SELECT %s"
+ >>> db.query_formatted(sql, (login,)).getresult()[0]
+ ProgrammingError: Could not determine data type of parameter $1
+
+The issue here is that :meth:`DB.query_formatted` by default still uses
+PostgreSQL parameters, transforming the Python style ``%s`` placeholder
+into a ``$1`` placeholder, and sending the login name separately from
+the query. In the query we looked at before, the concatenation with other
+strings made it clear that it should be interpreted as a string. This simple
+query however does not give PostgreSQL a clue what data type the ``$1``
+placeholder stands for.
+
+This is different when you are embedding the login name directly into the
+query instead of passing it as parameter to PostgreSQL. You can achieve this
+by setting the *inline* parameter of :meth:`DB.query_formatted`, like so::
+
+ >>> sql = "SELECT %s"
+ >>> db.query_formatted(sql, (login,), inline=True).getresult()[0]
+
+Another way of making this query work while still sending the parameters
+separately is to simply cast the parameter values::
+
+ >>> sql = "SELECT %s::text"
+ >>> db.query_formatted(sql, (login,), inline=False).getresult()[0]
+
+In real world examples you will rarely have to cast your parameters like that,
+since in an INSERT statement or a WHERE clause comparing the parameter to a
+table column the data type will be clear from the context.
+
+When binding the parameters to a query, PyGreSQL does not only adapt the basic
+types like ``int``, ``float``, ``bool`` and ``str``, but also tries to make
+sense of Python lists and tuples.
Lists are adapted as PostgreSQL arrays::
>>> params = dict(array=[[1, 2],[3, 4]])
- >>> cur.execute("SELECT %(array)s", params).fetchone()[0]
+ >>> db.query_formatted("SELECT %(array)s::int[]", params).getresult()[0][0]
[[1, 2], [3, 4]]
-Note that the query gives the value back as Python lists again. This
+Note that again we only need to cast the array parameter or use inline
+parameters because this simple query does not provide enough context.
+Also note that the query gives the value back as Python lists again. This
is achieved by the typecasting mechanism explained in the next section.
-The query that was actually executed was this:
-
-.. code-block:: sql
-
- SELECT ARRAY[[1,2],[3,4]]
-
-Again, if we had inserted the list using the ``%`` operator without adaptation,
-the ``ARRAY`` keyword would have been missing in the query.
-
-Tuples are adapted as PostgreSQL composite types::
- >>> params = dict(record=('Bond', 'James'))
- >>> cur.execute("SELECT %(record)s", params).fetchone()[0]
- ('Bond', 'James')
-
-You can also use this feature with the ``IN`` syntax of SQL::
-
- >>> params = dict(what='needle', where=('needle', 'haystack'))
- >>> cur.execute("SELECT %(what)s IN %(where)s", params).fetchone()[0]
- True
-
-Sometimes a Python type can be ambiguous. For instance, you might want
-to insert a Python list not into an array column, but into a JSON column.
-Or you want to interpret a string as a date and insert it into a DATE column.
-In this case you can give PyGreSQL a hint by using :ref:`type_constructors`::
-
- >>> cur.execute("CREATE TABLE json_data (data json, created date)")
- >>> params = dict(
- ... data=pgdb.Json([1, 2, 3]), created=pgdb.Date(2016, 1, 29))
- >>> sql = ("INSERT INTO json_data VALUES (%(data)s, %(created)s)")
- >>> cur.execute(sql, params)
- >>> cur.execute("SELECT * FROM json_data").fetchone()
- Row(data=[1, 2, 3], created='2016-01-29')
+Tuples are adapted as PostgreSQL composite types. If you use inline paramters,
+they can also be used with the ``IN`` syntax.
-Let's think of another example where we create a table with a composite
-type in PostgreSQL:
+Let's think of a more real world example again where we create a table with a
+composite type in PostgreSQL:
.. code-block:: sql
@@ -151,12 +165,19 @@
Using the automatic adaptation of Python tuples, an item can now be
inserted into the database and then read back as follows::
- >>> cur.execute("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
+ >>> db.query_formatted("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
... dict(item=inventory_item('fuzzy dice', 42, 1.99), count=1000))
- >>> cur.execute("SELECT * FROM on_hand").fetchone()
+ >>> db.query("SELECT * FROM on_hand").getresult()[0][0]
Row(item=inventory_item(name='fuzzy dice', supplier_id=42,
price=Decimal('1.99')), count=1000)
+The :meth:`DB.insert` method provides a simpler way to achieve the same::
+
+ >>> row = dict(item=inventory_item('fuzzy dice', 42, 1.99), count=1000)
+ >>> db.insert('on_hand', row)
+ {'count': 1000, 'item': inventory_item(name='fuzzy dice',
+ supplier_id=42, price=Decimal('1.99'))}
+
However, we may not want to use named tuples, but custom Python classes
to hold our values, like this one::
@@ -172,20 +193,13 @@
... self.name, self.supplier_id, self.price)
But when we try to insert an instance of this class in the same way, we
-will get an error::
+will get an error. This is because PyGreSQL tries to pass the string
+representation of the object as a parameter to PostgreSQL, but this is just a
+human readable string and not useful for PostgreSQL to build a composite type.
+However, it is possible to make such custom classes adapt themselves to
+PostgreSQL by adding a "magic" method with the name ``__pg_str__``, like so::
- >>> cur.execute("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
- ... dict(item=InventoryItem('fuzzy dice', 42, 1.99), count=1000))
- InterfaceError: Do not know how to adapt type <class 'InventoryItem'>
-
-While PyGreSQL knows how to adapt tuples, it does not know what to make out
-of our custom class. To simply convert the object to a string using the
-``str`` function is not a solution, since this yields a human readable string
-that is not useful for PostgreSQL. However, it is possible to make such
-custom classes adapt themselves to PostgreSQL by adding a "magic" method
-with the name ``__pg_repr__``, like this::
-
- >>> class InventoryItem:
+ >>> class InventoryItem:
...
... ...
...
@@ -193,77 +207,94 @@
... return '%s (from %s, at $%s)' % (
... self.name, self.supplier_id, self.price)
...
- ... def __pg_repr__(self):
+ ... def __pg_str__(self, typ):
... return (self.name, self.supplier_id, self.price)
Now you can insert class instances the same way as you insert named tuples.
+You can even make these objects adapt to different types in different ways::
-Note that PyGreSQL adapts the result of ``__pg_repr__`` again if it is a
-tuple or a list. Otherwise, it must be a properly escaped string.
+ >>> class InventoryItem:
+ ...
+ ... ...
+ ...
+ ... def __pg_str__(self, typ):
+ ... if typ == 'text':
+ ... return str(self)
+ ... return (self.name, self.supplier_id, self.price)
+ ...
+ >>> db.query("ALTER TABLE on_hand ADD COLUMN remark varchar")
+ >>> item=InventoryItem('fuzzy dice', 42, 1.99)
+ >>> row = dict(item=item, remark=item, count=1000)
+ >>> db.insert('on_hand', row)
+ {'count': 1000, 'item': inventory_item(name='fuzzy dice',
+ supplier_id=42, price=Decimal('1.99')),
+ 'remark': 'fuzzy dice (from 42, at $1.99)'}
+
+There is also another "magic" method ``__pg_repr__`` which does not take the
+*typ* parameter. That method is used instead of ``__pg_str__`` when passing
+parameters inline. You must be more careful when using ``__pg_repr__``,
+because it must return a properly escaped string that can be put literally
+inside the SQL. The only exception is when you return a tuple or list,
+because these will be adapted and properly escaped by PyGreSQL again.
Typecasting to Python
---------------------
As you noticed, PyGreSQL automatically converted the PostgreSQL data to
-suitable Python objects when returning values via one of the "fetch" methods
-of a cursor. This is done by the use of built-in typecast functions.
+suitable Python objects when returning values via the :meth:`DB.get()`,
+:meth:`Query.getresult()` and similar methods. This is done by the use
+of built-in typecast functions.
-If you want to use different typecast functions or add your own if no
+If you want to use different typecast functions or add your own if no
built-in typecast function is available, then this is possible using
-the :func:`set_typecast` function. With the :func:`get_typecast` method
-you can check which function is currently set, and :func:`reset_typecast`
-allows you to reset the typecast function to its default. If no typecast
-function is set, then PyGreSQL will return the raw strings from the database.
+the :func:`set_typecast` function. With the :func:`get_typecast` function
+you can check which function is currently set. If no typecast function
+is set, then PyGreSQL will return the raw strings from the database.
For instance, you will find that PyGreSQL uses the normal ``int`` function
to cast PostgreSQL ``int4`` type values to Python::
- >>> pgdb.get_typecast('int4')
+ >>> pg.get_typecast('int4')
int
-You can change this to return float values instead::
+In the classic PyGreSQL module, the typecasting for these basic types is
+always done internally by the C extension module for performance reasons.
+We can set a different typecast function for ``int4``, but it will not
+become effective, the C module continues to use its internal typecasting.
+
+However, we can add new typecast functions for the database types that are
+not supported by the C modul. Fore example, we can create a typecast function
+that casts items of the composite PostgreSQL type used as example in the
+previous section to instances of the corresponding Python class.
+
+To do this, at first we get the default typecast function that PyGreSQL has
+created for the current :class:`DB` connection. This default function casts
+composite types to named tuples, as we have seen in the section before.
+We can grab it from the :attr:`DB.dbtypes` object as follows::
+
+ >>> cast_tuple = db.dbtypes.get_typecast('inventory_item')
- >>> pgdb.set_typecast('int4', float)
- >>> con = pgdb.connect(...)
- >>> cur = con.cursor()
- >>> cur.execute('select 42::int4').fetchone()[0]
- 42.0
+Now we can create a new typecast function that converts the tuple to
+an instance of our custom class::
-Note that the connections cache typecast functions, so you may need to
-reopen the database connection, or reset the cache of the connection to
-make this effective, using the following command::
+ >>> cast_item = lambda value: InventoryItem(*cast_tuple(value))
- >>> con.type_cache.reset_typecast()
+Finally, we set this typecast function, either globally with
+:func:`set_typecast`, or locally for the current connection like this::
-The :class:`TypeCache` of the connection can also be used to change typecast
-functions locally for one database connection only.
+ >>> db.dbtypes.set_typecast('inventory_item', cast_item)
-As a more useful example, we can create a typecast function that casts
-items of the composite type used as example in the previous section
-to instances of the corresponding Python class::
+Now we can get instances of our custom class directly from the database::
- >>> con.type_cache.reset_typecast()
- >>> cast_tuple = con.type_cache.get_typecast('inventory_item')
- >>> cast_item = lambda value: InventoryItem(*cast_tuple(value))
- >>> con.type_cache.set_typecast('inventory_item', cast_item)
- >>> str(cur.execute("SELECT * FROM on_hand").fetchone()[0])
+ >>> item = db.query("SELECT * FROM on_hand").getresult()[0][0]
+ >>> str(item)
'fuzzy dice (from 42, at $1.99)'
-As you saw in the last section you, PyGreSQL also has a typecast function
-for JSON, which is the default JSON decoder from the standard library.
-Let's assume we want to use a slight variation of that decoder in which
-every integer in JSON is converted to a float in Python. This can be
-accomplished as follows::
-
- >>> from json import loads
- >>> cast_json = lambda v: loads(v, parse_int=float)
- >>> pgdb.set_typecast('json', cast_json)
- >>> cur.execute("SELECT data FROM json_data").fetchone()[0]
- [1.0, 2.0, 3.0]
-
-Note again that you may need to ``type_cache.reset_typecast()`` to make
-this effective. Also note that the two types ``json`` and ``jsonb`` have
-their own typecast functions, so if you use ``jsonb`` instead of ``json``,
-you need to use this type name when setting the typecast function::
-
- >>> pgdb.set_typecast('jsonb', cast_json)
+Note that some of the typecast functions used by the C module are configurable
+with separate module level functions, such as :meth:`set_decimal`,
+:meth:`set_bool` or :meth:`set_jsondecode`. You need to use these instead of
+:meth:`set_typecast` if you want to change the behavior of the C module.
+
+Also note that after changing global typecast functions with
+:meth:`set_typecast`, you may need to run ``db.dbtypes.reset_typecast()``
+to make these changes effective on connections that were already open.
Modified: trunk/docs/contents/pg/db_wrapper.rst
==============================================================================
--- trunk/docs/contents/pg/db_wrapper.rst Sat Jan 30 14:55:18 2016
(r798)
+++ trunk/docs/contents/pg/db_wrapper.rst Sun Jan 31 13:45:58 2016
(r799)
@@ -452,6 +452,47 @@
rows = db.query("update employees set phone=$2 where name=$1",
name, phone).getresult()[0][0]
+query_formatted -- execute a formatted SQL command string
+---------------------------------------------------------
+
+.. method:: DB.query_formatted(command, parameters, [types], [inline])
+
+ Execute a formatted SQL command string
+
+ :param str command: SQL command
+ :param parameters: the values of the parameters for the SQL command
+ :type parameters: tuple, list or dict
+ :param types: optionally, the types of the parameters
+ :type types: tuple, list or dict
+ :param bool inline: whether the parameters should be passed in the SQL
+ :rtype: :class:`Query`, None
+ :raises TypeError: bad argument type, or too many arguments
+ :raises TypeError: invalid connection
+ :raises ValueError: empty SQL query or lost connection
+ :raises pg.ProgrammingError: error in query
+ :raises pg.InternalError: error during query processing
+
+Similar to :meth:`DB.query`, but using Python format placeholders of the form
+``%s`` or ``%(names)s`` instead of PostgreSQL placeholders of the form ``$1``.
+The parameters must be passed as a tuple, list or dict. You can also pass a
+corresponding tuple, list or dict of database types in order to format the
+parameters properly in case there is ambiguity.
+
+If you set *inline* to True, the parameters will be sent to the database
+embedded in the SQL command, otherwise they will be sent separately.
+
+Example::
+
+ name = input("Name? ")
+ phone = input("Phone? ")
+ rows = db.query_formatted(
+ "update employees set phone=%s where name=%s",
+ (phone, name)).getresult()[0][0]
+ # or
+ rows = db.query_formatted(
+ "update employees set phone=%(phone)s where name=%(name)s",
+ dict(name=name, phone=phone)).getresult()[0][0]
+
clear -- clear row values in memory
-----------------------------------
@@ -779,7 +820,6 @@
The name of the database that the connection is using
-
.. attribute:: DB.dbtypes
A dictionary with the various type names for the PostgreSQL types
@@ -789,3 +829,12 @@
description of the :class:`DbTypes` class for details.
.. versionadded:: 5.0
+
+.. attribute:: DB.adapter
+
+ A class with some helper functions for adapting parameters
+
+This can be used for building queries with parameters. You normally will
+not need this, as you can use the :class:`DB.query_formatted` method.
+
+.. versionadded:: 5.0
Modified: trunk/docs/contents/pg/index.rst
==============================================================================
--- trunk/docs/contents/pg/index.rst Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/docs/contents/pg/index.rst Sun Jan 31 13:45:58 2016 (r799)
@@ -16,3 +16,4 @@
large_objects
notification
db_types
+ adaptation
Modified: trunk/docs/contents/pg/module.rst
==============================================================================
--- trunk/docs/contents/pg/module.rst Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/docs/contents/pg/module.rst Sun Jan 31 13:45:58 2016 (r799)
@@ -499,9 +499,28 @@
the above methods are only called for the types that are not already
supported by the C extension module.
+Type helpers
+------------
+
+The module provides the following type helper functions. You can wrap
+parameters with these functions when passing them to :meth:`DB.query_formatted`
+in order to give PyGreSQL a hint about the type of the parameters.
+
+.. function:: Bytea(bytes)
+
+ A wrapper for holding a bytea value
+
+.. function:: Json(obj)
+
+ A wrapper for holding an object serializable to JSON
+
+.. function:: Literal(sql)
+
+ A wrapper for holding a literal SQL string
Module constants
----------------
+
Some constants are defined in the module dictionary.
They are intended to be used as parameters for methods calls.
You should refer to the libpq description in the PostgreSQL user manual
Modified: trunk/docs/contents/pgdb/adaptation.rst
==============================================================================
--- trunk/docs/contents/pgdb/adaptation.rst Sat Jan 30 14:55:18 2016
(r798)
+++ trunk/docs/contents/pgdb/adaptation.rst Sun Jan 31 13:45:58 2016
(r799)
@@ -208,9 +208,9 @@
suitable Python objects when returning values via one of the "fetch" methods
of a cursor. This is done by the use of built-in typecast functions.
-If you want to use different typecast functions or add your own if no
+If you want to use different typecast functions or add your own if no
built-in typecast function is available, then this is possible using
-the :func:`set_typecast` function. With the :func:`get_typecast` method
+the :func:`set_typecast` function. With the :func:`get_typecast` function
you can check which function is currently set, and :func:`reset_typecast`
allows you to reset the typecast function to its default. If no typecast
function is set, then PyGreSQL will return the raw strings from the database.
@@ -229,7 +229,7 @@
>>> cur.execute('select 42::int4').fetchone()[0]
42.0
-Note that the connections cache typecast functions, so you may need to
+Note that the connections cache the typecast functions, so you may need to
reopen the database connection, or reset the cache of the connection to
make this effective, using the following command::
@@ -261,9 +261,9 @@
>>> cur.execute("SELECT data FROM json_data").fetchone()[0]
[1.0, 2.0, 3.0]
-Note again that you may need to ``type_cache.reset_typecast()`` to make
-this effective. Also note that the two types ``json`` and ``jsonb`` have
-their own typecast functions, so if you use ``jsonb`` instead of ``json``,
-you need to use this type name when setting the typecast function::
+Note again that you may need to run ``con.type_cache.reset_typecast()`` to
+make this effective. Also note that the two types ``json`` and ``jsonb`` have
+their own typecast functions, so if you use ``jsonb`` instead of ``json``, you
+need to use this type name when setting the typecast function::
>>> pgdb.set_typecast('jsonb', cast_json)
Modified: trunk/docs/contents/pgdb/types.rst
==============================================================================
--- trunk/docs/contents/pgdb/types.rst Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/docs/contents/pgdb/types.rst Sun Jan 31 13:45:58 2016 (r799)
@@ -54,13 +54,21 @@
Additionally, PyGreSQL provides the following constructors for PostgreSQL
specific data types:
+.. function:: Bytea(bytes)
+
+ Construct an object capable of holding a bytea value
+
.. function:: Json(obj, [encode])
- Construct a wrapper for holding an object serializable to JSON.
+ Construct a wrapper for holding an object serializable to JSON
You can pass an optional serialization function as a parameter.
By default, PyGreSQL uses :func:`json.dumps` to serialize it.
+.. function:: Literal(sql)
+
+ Construct a wrapper for holding a literal SQL string
+
Example for using a type constructor::
>>> cursor.execute("create table jsondata (data jsonb)")
Modified: trunk/pg.py
==============================================================================
--- trunk/pg.py Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/pg.py Sun Jan 31 13:45:58 2016 (r799)
@@ -34,9 +34,10 @@
import select
import warnings
+from datetime import date, time, datetime, timedelta
from decimal import Decimal
+from math import isnan, isinf
from collections import namedtuple
-from functools import partial
from operator import itemgetter
from re import compile as regex
from json import loads as jsondecode, dumps as jsonencode
@@ -144,7 +145,7 @@
return 'oid(%s)' % table
-class _SimpleType(dict):
+class _SimpleTypes(dict):
"""Dictionary mapping pg_type names to simple type names."""
_types = {'bool': 'bool',
@@ -168,18 +169,55 @@
def __missing__(key):
return 'text'
-_simpletype = _SimpleType()
+_simpletypes = _SimpleTypes()
-class _Adapt:
- """Mixin providing methods for adapting records and record elements.
+def _quote_if_unqualified(param, name):
+ """Quote parameter representing a qualified name.
- This is used when passing values from one of the higher level DB
- methods as parameters for a query.
-
- This class must be mixed in to a connection class, because it needs
- connection specific methods such as escape_bytea().
+ Puts a quote_ident() call around the give parameter unless
+ the name contains a dot, in which case the name is ambiguous
+ (could be a qualified name or just a name with a dot in it)
+ and must be quoted manually by the caller.
"""
+ if isinstance(name, basestring) and '.' not in name:
+ return 'quote_ident(%s)' % (param,)
+ return param
+
+
+class _ParameterList(list):
+ """Helper class for building typed parameter lists."""
+
+ def add(self, value, typ=None):
+ """Typecast value with known database type and build parameter list.
+
+ If this is a literal value, it will be returned as is. Otherwise, a
+ placeholder will be returned and the parameter list will be augmented.
+ """
+ value = self.adapt(value, typ)
+ if isinstance(value, Literal):
+ return value
+ self.append(value)
+ return '$%d' % len(self)
+
+
+class Literal(str):
+ """Wrapper class for marking literal SQL values."""
+
+
+class Json:
+ """Wrapper class for marking Json values."""
+
+ def __init__(self, obj):
+ self.obj = obj
+
+
+class Bytea(bytes):
+ """Wrapper class for marking Bytea values."""
+
+
+class Adapter:
+ """Class providing methods for adapting parameters to the database."""
_bool_true_values = frozenset('t true 1 y yes on'.split())
@@ -190,6 +228,13 @@
_re_record_quote = regex(r'[(,"\\]')
_re_array_escape = _re_record_escape = regex(r'(["\\])')
+ def __init__(self, db):
+ self.db = db
+ self.encode_json = db.encode_json
+ db = db.db
+ self.escape_bytea = db.escape_bytea
+ self.escape_string = db.escape_string
+
@classmethod
def _adapt_bool(cls, v):
"""Adapt a boolean parameter."""
@@ -205,7 +250,7 @@
if not v:
return None
if isinstance(v, basestring) and v.lower() in cls._date_literals:
- return _Literal(v)
+ return Literal(v)
return v
@staticmethod
@@ -297,71 +342,39 @@
def _adapt_record(self, v, typ):
"""Adapt a record parameter with given type."""
- typ = typ.attnames.values()
+ typ = self.get_attnames(typ).values()
if len(typ) != len(v):
raise TypeError('Record parameter %s has wrong size' % v)
- return '(%s)' % ','.join(getattr(self,
- '_adapt_record_%s' % t.simple)(v) for v, t in zip(v, typ))
-
- @classmethod
- def _adapt_record_text(cls, v):
- """Adapt a text type record component."""
- if v is None:
- return ''
- if not v:
- return '""'
- v = str(v)
- if cls._re_record_quote.search(v):
- v = '"%s"' % cls._re_record_escape.sub(r'\\\1', v)
- return v
-
- _adapt_record_date = _adapt_record_text
-
- @classmethod
- def _adapt_record_bool(cls, v):
- """Adapt a boolean record component."""
- if v is None:
- return ''
- if isinstance(v, basestring):
- if not v:
- return ''
- v = v.lower() in cls._bool_true_values
- return 't' if v else 'f'
-
- @staticmethod
- def _adapt_record_num(v):
- """Adapt a numeric record component."""
- if not v and v != 0:
- return ''
- return str(v)
-
- _adapt_record_int = _adapt_record_float = _adapt_record_money = \
- _adapt_record_num
-
- def _adapt_record_bytea(self, v):
- if v is None:
- return ''
- v = self.escape_bytea(v)
- if bytes is not str and isinstance(v, bytes):
- v = v.decode('ascii')
- return v.replace('\\', '\\\\')
-
- def _adapt_record_json(self, v):
- """Adapt a bytea record component."""
- if not v:
- return ''
- if not isinstance(v, basestring):
- v = self.encode_json(v)
- if self._re_array_quote.search(v):
- v = '"%s"' % self._re_array_escape.sub(r'\\\1', v)
- return v
-
- def _adapt_param(self, value, typ, params):
- """Adapt and add a parameter to the list."""
- if isinstance(value, _Literal):
- return value
- if value is not None:
- simple = typ.simple
+ adapt = self.adapt
+ value = []
+ for v, t in zip(v, typ):
+ v = adapt(v, t)
+ if v is None:
+ v = ''
+ elif not v:
+ v = '""'
+ else:
+ if isinstance(v, bytes):
+ if str is not bytes:
+ v = v.decode('ascii')
+ else:
+ v = str(v)
+ if self._re_record_quote.search(v):
+ v = '"%s"' % self._re_record_escape.sub(r'\\\1', v)
+ value.append(v)
+ return '(%s)' % ','.join(value)
+
+ def adapt(self, value, typ=None):
+ """Adapt a value with known database type."""
+ if value is not None and not isinstance(value, Literal):
+ if typ:
+ simple = self.get_simple_name(typ)
+ else:
+ typ = simple = self.guess_simple_type(value) or 'text'
+ try:
+ value = value.__pg_str__(typ)
+ except AttributeError:
+ pass
if simple == 'text':
pass
elif simple == 'record':
@@ -374,10 +387,167 @@
else:
adapt = getattr(self, '_adapt_%s' % simple)
value = adapt(value)
- if isinstance(value, _Literal):
- return value
- params.append(value)
- return '$%d' % len(params)
+ return value
+
+ @staticmethod
+ def simple_type(name):
+ """Create a simple database type with given attribute names."""
+ typ = DbType(name)
+ typ.simple = name
+ return typ
+
+ @staticmethod
+ def get_simple_name(typ):
+ """Get the simple name of a database type."""
+ if isinstance(typ, DbType):
+ return typ.simple
+ return _simpletypes[typ]
+
+ @staticmethod
+ def get_attnames(typ):
+ """Get the attribute names of a composite database type."""
+ if isinstance(typ, DbType):
+ return typ.attnames
+ return {}
+
+ @classmethod
+ def guess_simple_type(cls, value):
+ """Try to guess which database type the given value has."""
+ if isinstance(value, Bytea):
+ return 'bytea'
+ if isinstance(value, basestring):
+ return 'text'
+ if isinstance(value, bool):
+ return 'bool'
+ if isinstance(value, (int, long)):
+ return 'int'
+ if isinstance(value, float):
+ return 'float'
+ if isinstance(value, Decimal):
+ return 'num'
+ if isinstance(value, (date, time, datetime, timedelta)):
+ return 'date'
+ if isinstance(value, list):
+ return '%s[]' % cls.guess_simple_base_type(value)
+ if isinstance(value, tuple):
+ simple_type = cls.simple_type
+ typ = simple_type('record')
+ guess = cls.guess_simple_type
+ typ._get_attnames = lambda _self: AttrDict(
+ (str(n + 1), simple_type(guess(v)))
+ for n, v in enumerate(value))
+ return typ
+
+ @classmethod
+ def guess_simple_base_type(cls, value):
+ """Try to guess the base type of a given array."""
+ for v in value:
+ if isinstance(v, list):
+ typ = cls.guess_simple_base_type(v)
+ else:
+ typ = cls.guess_simple_type(v)
+ if typ:
+ return typ
+
+ def adapt_inline(self, value, nested=False):
+ """Adapt a value that is put into the SQL and needs to be quoted."""
+ if value is None:
+ return 'NULL'
+ if isinstance(value, Literal):
+ return value
+ if isinstance(value, Bytea):
+ value = self.escape_bytea(value)
+ if bytes is not str: # Python >= 3.0
+ value = value.decode('ascii')
+ elif isinstance(value, Json):
+ if value.encode:
+ return value.encode()
+ value = self.encode_json(value)
+ elif isinstance(value, (datetime, date, time, timedelta)):
+ value = str(value)
+ if isinstance(value, basestring):
+ value = self.escape_string(value)
+ return "'%s'" % value
+ if isinstance(value, bool):
+ return 'true' if value else 'false'
+ if isinstance(value, float):
+ if isinf(value):
+ return "'-Infinity'" if value < 0 else "'Infinity'"
+ if isnan(value):
+ return "'NaN'"
+ return value
+ if isinstance(value, (int, long, Decimal)):
+ return value
+ if isinstance(value, list):
+ q = self.adapt_inline
+ s = '[%s]' if nested else 'ARRAY[%s]'
+ return s % ','.join(str(q(v, nested=True)) for v in value)
+ if isinstance(value, tuple):
+ q = self.adapt_inline
+ return '(%s)' % ','.join(str(q(v)) for v in value)
+ try:
+ value = value.__pg_repr__()
+ except AttributeError:
+ raise InterfaceError(
+ 'Do not know how to adapt type %s' % type(value))
+ if isinstance(value, (tuple, list)):
+ value = self.adapt_inline(value)
+ return value
+
+ def parameter_list(self):
+ """Return a parameter list for parameters with known database types.
+
+ The list has an add(value, typ) method that will build up the
+ list and return either the literal value or a placeholder.
+ """
+ params = _ParameterList()
+ params.adapt = self.adapt
+ return params
+
+ def format_query(self, command, values, types=None, inline=False):
+ """Format a database query using the given values and types."""
+ if inline and types:
+ raise ValueError('Typed parameters must be sent separately')
+ params = self.parameter_list()
+ if isinstance(values, (list, tuple)):
+ if inline:
+ adapt = self.adapt_inline
+ literals = [adapt(value) for value in values]
+ else:
+ add = params.add
+ literals = []
+ append = literals.append
+ if types:
+ if (not isinstance(types, (list, tuple)) or
+ len(types) != len(values)):
+ raise TypeError('The values and types do not match')
+ for value, typ in zip(values, types):
+ append(add(value, typ))
+ else:
+ for value in values:
+ append(add(value))
+ command = command % tuple(literals)
+ elif isinstance(values, dict):
+ if inline:
+ adapt = self.adapt_inline
+ literals = dict((key, adapt(value))
+ for key, value in values.items())
+ else:
+ add = params.add
+ literals = {}
+ if types:
+ if (not isinstance(types, dict) or
+ len(types) < len(values)):
+ raise TypeError('The values and types do not match')
+ for key in sorted(values):
+ literals[key] = add(values[key], types[key])
+ else:
+ for key in sorted(values):
+ literals[key] = add(values[key])
+ command = command % literals
+ else:
+ raise TypeError('The values must be passed as tuple, list or dict')
+ return command, params
def cast_bool(value):
@@ -484,22 +654,20 @@
raise TypeError("Cast parameter must be callable")
for t in typ:
self[t] = cast
- self.pop('_%s % t', None)
+ self.pop('_%s' % t, None)
def reset(self, typ=None):
"""Reset the typecasts for the specified type(s) to their defaults.
When no type is specified, all typecasts will be reset.
"""
- defaults = self.defaults
if typ is None:
self.clear()
- self.update(defaults)
else:
if isinstance(typ, basestring):
typ = [typ]
for t in typ:
- self.set(t, defaults.get(t))
+ self.pop(t, None)
@classmethod
def get_default(cls, typ):
@@ -521,7 +689,7 @@
raise TypeError("Cast parameter must be callable")
for t in typ:
defaults[t] = cast
- defaults.pop('_%s % t', None)
+ defaults.pop('_%s' % t, None)
def get_attnames(self, typ):
"""Return the fields for the given record type.
@@ -602,7 +770,7 @@
"""Create a PostgreSQL type name with additional info."""
if oid in self:
return self[oid]
- simple = 'record' if relid else _simpletype[pgtype]
+ simple = 'record' if relid else _simpletypes[pgtype]
typ = DbType(regtype if self._regtypes else simple)
typ.oid = oid
typ.simple = simple
@@ -621,7 +789,7 @@
res = self.query("SELECT oid, typname, typname::regtype,"
" typtype, typcategory, typdelim, typrelid"
" FROM pg_type WHERE oid=%s::regtype" %
- (DB._adapt_qualified_param(key, 1),), (key,)).getresult()
+ (_quote_if_unqualified('$1', key),), (key,)).getresult()
except ProgrammingError:
res = None
if not res:
@@ -676,10 +844,6 @@
return cast(value)
-class _Literal(str):
- """Wrapper class for literal SQL."""
-
-
def _namedresult(q):
"""Get query result as named tuples."""
row = namedtuple('Row', q.listfields())
@@ -859,7 +1023,7 @@
# The actual PostGreSQL database connection interface:
-class DB(_Adapt):
+class DB:
"""Wrapper class for the _pg connection type."""
def __init__(self, *args, **kw):
@@ -895,6 +1059,7 @@
self._pkeys = {}
self._privileges = {}
self._args = args, kw
+ self.adapter = Adapter(self)
self.dbtypes = DbTypes(self)
db.set_cast_hook(self.dbtypes.typecast)
self.debug = None # For debugging scripts, this can be set
@@ -967,22 +1132,6 @@
"""Create a human readable parameter list."""
return ', '.join('$%d=%r' % (n, v) for n, v in enumerate(params, 1))
- @staticmethod
- def _adapt_qualified_param(name, param):
- """Quote parameter representing a qualified name.
-
- Escapes the name for use as an SQL parameter, unless the
- name contains a dot, in which case the name is ambiguous
- (could be a qualified name or just a name with a dot in it)
- and must be quoted manually by the caller.
-
- """
- if isinstance(param, int):
- param = "$%d" % param
- if isinstance(name, basestring) and '.' not in name:
- param = 'quote_ident(%s)' % (param,)
- return param
-
# Public methods
# escape_string and escape_bytea exist as methods,
@@ -1222,6 +1371,21 @@
self._do_debug(command)
return self.db.query(command)
+ def query_formatted(self, command, parameters, types=None, inline=False):
+ """Execute a formatted SQL command string.
+
+ Similar to query, but using Python format placeholders of the form
+ %s or %(names)s instead of PostgreSQL placeholders of the form $1.
+ The parameters must be passed as a tuple, list or dict. You can
+ also pass a corresponding tuple, list or dict of database types in
+ order to format the parameters properly in case there is ambiguity.
+
+ If you set inline to True, the parameters will be sent to the database
+ embedded in the SQL command, otherwise they will be sent separately.
+ """
+ return self.query(*self.adapter.format_query(
+ command, parameters, types, inline))
+
def pkey(self, table, composite=False, flush=False):
"""Get or set the primary key of a table.
@@ -1247,7 +1411,7 @@
" AND NOT a.attisdropped"
" WHERE i.indrelid=%s::regclass"
" AND i.indisprimary ORDER BY a.attnum") % (
- self._adapt_qualified_param(table, 1),)
+ _quote_if_unqualified('$1', table),)
pkey = self.db.query(q, (table,)).getresult()
if not pkey:
raise KeyError('Table %s has no primary key' % table)
@@ -1320,7 +1484,7 @@
" JOIN pg_type t ON t.oid = a.atttypid"
" WHERE a.attrelid = %s::regclass AND %s"
" AND NOT a.attisdropped ORDER BY a.attnum") % (
- self._adapt_qualified_param(table, 1), q)
+ _quote_if_unqualified('$1', table), q)
names = self.db.query(q, (table,)).getresult()
types = self.dbtypes
names = ((name[0], types.add(*name[1:])) for name in names)
@@ -1347,7 +1511,7 @@
return self._privileges[(table, privilege)]
except KeyError: # cache miss, ask the database
q = "SELECT has_table_privilege(%s, $2)" % (
- self._adapt_qualified_param(table, 1),)
+ _quote_if_unqualified('$1', table),)
q = self.db.query(q, (table, privilege))
ret = q.getresult()[0][0] == self._make_bool(True)
self._privileges[(table, privilege)] = ret # cache it
@@ -1404,12 +1568,12 @@
raise KeyError(
'Differing number of items in keyname and row')
row = dict(zip(keyname, row))
- params = []
- param = partial(self._adapt_param, params=params)
+ params = self.adapter.parameter_list()
+ adapt = params.add
col = self.escape_identifier
what = 'oid, *' if qoid else '*'
where = ' AND '.join('%s = %s' % (
- col(k), param(row[k], attnames[k])) for k in keyname)
+ col(k), adapt(row[k], attnames[k])) for k in keyname)
if 'oid' in row:
if qoid:
row[qoid] = row['oid']
@@ -1450,14 +1614,14 @@
del row['oid'] # do not insert oid
attnames = self.get_attnames(table)
qoid = _oid_key(table) if 'oid' in attnames else None
- params = []
- param = partial(self._adapt_param, params=params)
+ params = self.adapter.parameter_list()
+ adapt = params.add
col = self.escape_identifier
names, values = [], []
for n in attnames:
if n in row:
names.append(col(n))
- values.append(param(row[n], attnames[n]))
+ values.append(adapt(row[n], attnames[n]))
if not names:
raise _prg_error('No column found that can be inserted')
names, values = ', '.join(names), ', '.join(values)
@@ -1511,11 +1675,11 @@
keyname = ('oid',)
else:
raise KeyError('Missing primary key in row')
- params = []
- param = partial(self._adapt_param, params=params)
+ params = self.adapter.parameter_list()
+ adapt = params.add
col = self.escape_identifier
where = ' AND '.join('%s = %s' % (
- col(k), param(row[k], attnames[k])) for k in keyname)
+ col(k), adapt(row[k], attnames[k])) for k in keyname)
if 'oid' in row:
if qoid:
row[qoid] = row['oid']
@@ -1524,7 +1688,7 @@
keyname = set(keyname)
for n in attnames:
if n in row and n not in keyname:
- values.append('%s = %s' % (col(n), param(row[n], attnames[n])))
+ values.append('%s = %s' % (col(n), adapt(row[n], attnames[n])))
if not values:
return row
values = ', '.join(values)
@@ -1594,14 +1758,14 @@
del kw['oid'] # do not update oid
attnames = self.get_attnames(table)
qoid = _oid_key(table) if 'oid' in attnames else None
- params = []
- param = partial(self._adapt_param,params=params)
+ params = self.adapter.parameter_list()
+ adapt = params.add
col = self.escape_identifier
names, values, updates = [], [], []
for n in attnames:
if n in row:
names.append(col(n))
- values.append(param(row[n], attnames[n]))
+ values.append(adapt(row[n], attnames[n]))
names, values = ', '.join(names), ', '.join(values)
try:
keyname = self.pkey(table, True)
@@ -1708,11 +1872,11 @@
keyname = ('oid',)
else:
raise KeyError('Missing primary key in row')
- params = []
- param = partial(self._adapt_param, params=params)
+ params = self.adapter.parameter_list()
+ adapt = params.add
col = self.escape_identifier
where = ' AND '.join('%s = %s' % (
- col(k), param(row[k], attnames[k])) for k in keyname)
+ col(k), adapt(row[k], attnames[k])) for k in keyname)
if 'oid' in row:
if qoid:
row[qoid] = row['oid']
Modified: trunk/pgdb.py
==============================================================================
--- trunk/pgdb.py Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/pgdb.py Sun Jan 31 13:45:58 2016 (r799)
@@ -209,7 +209,7 @@
raise TypeError("Cast parameter must be callable")
for t in typ:
self[t] = cast
- self.pop('_%s % t', None)
+ self.pop('_%s' % t, None)
def reset(self, typ=None):
"""Reset the typecasts for the specified type(s) to their defaults.
@@ -224,7 +224,18 @@
if isinstance(typ, basestring):
typ = [typ]
for t in typ:
- self.set(t, defaults.get(t))
+ cast = defaults.get(t)
+ if cast:
+ self[t] = cast
+ t = '_%s' % t
+ cast = defaults.get(t)
+ if cast:
+ self[t] = cast
+ else:
+ self.pop(t, None)
+ else:
+ self.pop(t, None)
+ self.pop('_%s' % t, None)
def create_array_cast(self, cast):
"""Create an array typecast for the given base cast."""
@@ -482,7 +493,7 @@
if isnan(value):
return "'NaN'"
return value
- if isinstance(value, (int, long, Decimal)):
+ if isinstance(value, (int, long, Decimal, Literal)):
return value
if isinstance(value, list):
# Quote value as an ARRAY constructor. This is better than using
@@ -496,18 +507,18 @@
# Quote as a ROW constructor. This is better than using a record
# literal because it carries the information that this is a record
# and not a string. We don't use the keyword ROW in order to make
- # this usable with the IN synntax as well. It is only necessary
+ # this usable with the IN syntax as well. It is only necessary
# when the records has a single column which is not really useful.
q = self._quote
return '(%s)' % ','.join(str(q(v)) for v in value)
try:
value = value.__pg_repr__()
- if isinstance(value, (tuple, list)):
- value = self._quote(value)
- return value
except AttributeError:
raise InterfaceError(
'Do not know how to adapt type %s' % type(value))
+ if isinstance(value, (tuple, list)):
+ value = self._quote(value)
+ return value
def _quoteparams(self, string, parameters):
"""Quote parameters.
@@ -1295,6 +1306,10 @@
# Additional type helpers for PyGreSQL:
+class Bytea(bytes):
+ """Construct an object capable of holding a bytea value."""
+
+
class Json:
"""Construct a wrapper for holding an object serializable to JSON."""
@@ -1311,6 +1326,18 @@
__pg_repr__ = __str__
+class Literal:
+ """Construct a wrapper for holding a literal SQL string."""
+
+ def __init__(self, sql):
+ self.sql = sql
+
+ def __str__(self):
+ return self.sql
+
+ __pg_repr__ = __str__
+
+
# If run as script, print some information:
if __name__ == '__main__':
Modified: trunk/tests/test_classic_dbwrapper.py
==============================================================================
--- trunk/tests/test_classic_dbwrapper.py Sat Jan 30 14:55:18 2016
(r798)
+++ trunk/tests/test_classic_dbwrapper.py Sun Jan 31 13:45:58 2016
(r799)
@@ -23,6 +23,7 @@
import pg # the module under test
from decimal import Decimal
+from datetime import date
from operator import itemgetter
# We need a database to test against. If LOCAL_PyGreSQL.py exists we will
@@ -176,7 +177,7 @@
def testAllDBAttributes(self):
attributes = [
- 'abort',
+ 'abort', 'adapter',
'begin',
'cancel', 'clear', 'close', 'commit',
'db', 'dbname', 'dbtypes',
@@ -197,7 +198,7 @@
'options',
'parameter', 'pkey', 'port',
'protocol_version', 'putline',
- 'query',
+ 'query', 'query_formatted',
'release', 'reopen', 'reset', 'rollback',
'savepoint', 'server_version',
'set_cast_hook', 'set_notice_receiver',
@@ -666,10 +667,10 @@
f(set(['default_with_oids', 'standard_conforming_strings']), 'on')
self.assertEqual(g('default_with_oids'), 'on')
self.assertEqual(g('standard_conforming_strings'), 'on')
- self.assertRaises(ValueError, f, set([ 'default_with_oids',
-
'standard_conforming_strings']), ['off', 'on'])
+ self.assertRaises(ValueError, f, set(['default_with_oids',
+ 'standard_conforming_strings']), ['off', 'on'])
f(set(['default_with_oids', 'standard_conforming_strings']),
- ['off', 'off'])
+ ['off', 'off'])
self.assertEqual(g('default_with_oids'), 'off')
self.assertEqual(g('standard_conforming_strings'), 'off')
f({'standard_conforming_strings': 'on', 'datestyle': 'ISO, YMD'})
@@ -876,6 +877,17 @@
except pg.ProgrammingError as error:
self.assertEqual(error.sqlstate, '22012')
+ def testQueryFormatted(self):
+ f = self.db.query_formatted
+ t = True if pg.get_bool() else 't'
+ q = f("select %s::int, %s::real, %s::text, %s::bool",
+ (3, 2.5, 'hello', True))
+ r = q.getresult()[0]
+ self.assertEqual(r, (3, 2.5, 'hello', t))
+ q = f("select %s, %s, %s, %s", (3, 2.5, 'hello', True), inline=True)
+ r = q.getresult()[0]
+ self.assertEqual(r, (3, 2.5, 'hello', t))
+
def testPkey(self):
query = self.db.query
pkey = self.db.pkey
@@ -884,20 +896,20 @@
self.createTable('%s0' % t, 'a smallint')
self.createTable('%s1' % t, 'b smallint primary key')
self.createTable('%s2' % t,
- 'c smallint, d smallint primary key')
+ 'c smallint, d smallint primary key')
self.createTable('%s3' % t,
- 'e smallint, f smallint, g smallint, h smallint,
i smallint,'
- ' primary key (f, h)')
+ 'e smallint, f smallint, g smallint, h smallint, i smallint,'
+ ' primary key (f, h)')
self.createTable('%s4' % t,
- 'e smallint, f smallint, g smallint, h smallint,
i smallint,'
- ' primary key (h, f)')
+ 'e smallint, f smallint, g smallint, h smallint, i smallint,'
+ ' primary key (h, f)')
self.createTable('%s5' % t,
- 'more_than_one_letter varchar primary key')
+ 'more_than_one_letter varchar primary key')
self.createTable('%s6' % t,
- '"with space" date primary key')
+ '"with space" date primary key')
self.createTable('%s7' % t,
- 'a_very_long_column_name varchar, "with space"
date, "42" int,'
- ' primary key (a_very_long_column_name, "with
space", "42")')
+ 'a_very_long_column_name varchar, "with space" date, "42" int,'
+ ' primary key (a_very_long_column_name, "with space", "42")')
self.assertRaises(KeyError, pkey, '%s0' % t)
self.assertEqual(pkey('%s1' % t), 'b')
self.assertEqual(pkey('%s1' % t, True), ('b',))
@@ -1021,7 +1033,7 @@
get_attnames = self.db.get_attnames
table = 'test table for get_attnames()'
self.createTable(table,
- '"Prime!" smallint, "much space" integer,
"Questions?" text')
+ '"Prime!" smallint, "much space" integer, "Questions?" text')
r = get_attnames(table)
self.assertIsInstance(r, dict)
if self.regtypes:
@@ -1051,10 +1063,10 @@
'x': 'smallint', 'z': 'smallint', 'oid': 'oid'})
else:
self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
- 'e': 'num', 'f': 'float', 'f2': 'float', 'm':
'money',
- 'normal_name': 'int', 'Special Name': 'int',
- 'u': 'text', 't': 'text', 'v': 'text',
- 'y': 'int', 'x': 'int', 'z': 'int', 'oid':
'int'})
+ 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
+ 'normal_name': 'int', 'Special Name': 'int',
+ 'u': 'text', 't': 'text', 'v': 'text',
+ 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
def testGetAttnamesWithRegtypes(self):
get_attnames = self.db.get_attnames
@@ -1399,14 +1411,14 @@
get('test_students', "D' Arcy")
except pg.DatabaseError as error:
self.assertEqual(str(error),
- 'No such record in test_students\nwhere
"firstname" = $1\n'
- 'with $1="D\' Arcy"')
+ 'No such record in test_students\nwhere "firstname" = $1\n'
+ 'with $1="D\' Arcy"')
try:
get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
except pg.DatabaseError as error:
self.assertEqual(str(error),
- 'No such record in test_students\nwhere
"firstname" = $1\n'
- 'with $1="Robert\'); TRUNCATE TABLE
test_students;--"')
+ 'No such record in test_students\nwhere "firstname" = $1\n'
+ 'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
q = "select * from test_students order by 1 limit 4"
r = query(q).getresult()
self.assertEqual(len(r), 3)
@@ -1419,54 +1431,54 @@
decimal = pg.get_decimal()
table = 'insert_test_table'
self.createTable(table,
- 'i2 smallint, i4 integer, i8 bigint,'
- ' d numeric, f4 real, f8 double precision, m money,'
- ' v4 varchar(4), c4 char(4), t text,'
- ' b boolean, ts timestamp', oids=True)
+ 'i2 smallint, i4 integer, i8 bigint,'
+ ' d numeric, f4 real, f8 double precision, m money,'
+ ' v4 varchar(4), c4 char(4), t text,'
+ ' b boolean, ts timestamp', oids=True)
oid_table = 'oid(%s)' % table
tests = [dict(i2=None, i4=None, i8=None),
- (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
- (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
- dict(i2=42, i4=123456, i8=9876543210),
- dict(i2=2 ** 15 - 1,
- i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
- dict(d=None), (dict(d=''), dict(d=None)),
- dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
- dict(f4=None, f8=None), dict(f4=0, f8=0),
- (dict(f4='', f8=''), dict(f4=None, f8=None)),
- (dict(d=1234.5, f4=1234.5, f8=1234.5),
- dict(d=Decimal('1234.5'))),
- dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
- dict(d=Decimal('123456789.9876543212345678987654321')),
- dict(m=None), (dict(m=''), dict(m=None)),
- dict(m=Decimal('-1234.56')),
- (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
- dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
- (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
- (dict(m=1234.5), dict(m=Decimal('1234.5'))),
- (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
- (dict(m=123456), dict(m=Decimal('123456'))),
- (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
- dict(b=None), (dict(b=''), dict(b=None)),
- dict(b='f'), dict(b='t'),
- (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
- (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
- (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
- (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
- (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
- (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
- dict(v4=None, c4=None, t=None),
- (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
- dict(v4='1234', c4='1234', t='1234' * 10),
- dict(v4='abcd', c4='abcd', t='abcdefg'),
- (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
- dict(ts=None), (dict(ts=''), dict(ts=None)),
- (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
- dict(ts='2012-12-21 00:00:00'),
- (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
- dict(ts='2012-12-21 12:21:12'),
- dict(ts='2013-01-05 12:13:14'),
- dict(ts='current_timestamp')]
+ (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
+ (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
+ dict(i2=42, i4=123456, i8=9876543210),
+ dict(i2=2 ** 15 - 1,
+ i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
+ dict(d=None), (dict(d=''), dict(d=None)),
+ dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
+ dict(f4=None, f8=None), dict(f4=0, f8=0),
+ (dict(f4='', f8=''), dict(f4=None, f8=None)),
+ (dict(d=1234.5, f4=1234.5, f8=1234.5),
+ dict(d=Decimal('1234.5'))),
+ dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
+ dict(d=Decimal('123456789.9876543212345678987654321')),
+ dict(m=None), (dict(m=''), dict(m=None)),
+ dict(m=Decimal('-1234.56')),
+ (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
+ dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
+ (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
+ (dict(m=1234.5), dict(m=Decimal('1234.5'))),
+ (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
+ (dict(m=123456), dict(m=Decimal('123456'))),
+ (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
+ dict(b=None), (dict(b=''), dict(b=None)),
+ dict(b='f'), dict(b='t'),
+ (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
+ (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
+ (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
+ (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
+ (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
+ (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
+ dict(v4=None, c4=None, t=None),
+ (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
+ dict(v4='1234', c4='1234', t='1234' * 10),
+ dict(v4='abcd', c4='abcd', t='abcdefg'),
+ (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
+ dict(ts=None), (dict(ts=''), dict(ts=None)),
+ (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
+ dict(ts='2012-12-21 00:00:00'),
+ (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
+ dict(ts='2012-12-21 12:21:12'),
+ dict(ts='2013-01-05 12:13:14'),
+ dict(ts='current_timestamp')]
for test in tests:
if isinstance(test, dict):
data = test
@@ -2056,7 +2068,7 @@
self.assertEqual(r, result)
table = 'clear_test_table'
self.createTable(table,
- 'n integer, f float, b boolean, d date, t text',
oids=True)
+ 'n integer, f float, b boolean, d date, t text', oids=True)
r = clear(table)
result = dict(n=0, f=0, b=f, d='', t='')
self.assertEqual(r, result)
@@ -2072,7 +2084,7 @@
clear = self.db.clear
table = 'test table for clear()'
self.createTable(table, '"Prime!" smallint primary key,'
- ' "much space" integer, "Questions?" text')
+ ' "much space" integer, "Questions?" text')
r = clear(table)
self.assertIsInstance(r, dict)
self.assertEqual(r['Prime!'], 0)
@@ -2215,21 +2227,21 @@
self.assertEqual(r, 'c')
table = 'delete_test_table_2'
self.createTable(table,
- 'n integer, m integer, t text, primary key (n, m)',
- values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
- for n in range(3) for m in range(2)])
+ 'n integer, m integer, t text, primary key (n, m)',
+ values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
+ for n in range(3) for m in range(2)])
self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
r = [r[0] for r in query('select t from "%s" where n=2'
- ' order by m' % table).getresult()]
+ ' order by m' % table).getresult()]
self.assertEqual(r, ['c'])
self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 0)
r = [r[0] for r in query('select t from "%s" where n=3'
- ' order by m' % table).getresult()]
+ ' order by m' % table).getresult()]
self.assertEqual(r, ['e', 'f'])
self.assertEqual(self.db.delete(table, dict(n=3, m=1)), 1)
r = [r[0] for r in query('select t from "%s" where n=3'
- ' order by m' % table).getresult()]
+ ' order by m' % table).getresult()]
self.assertEqual(r, ['f'])
def testDeleteWithQuotedNames(self):
@@ -2237,8 +2249,8 @@
query = self.db.query
table = 'test table for delete()'
self.createTable(table, '"Prime!" smallint primary key,'
- ' "much space" integer, "Questions?" text',
- values=[(19, 5005, 'Yes!')])
+ ' "much space" integer, "Questions?" text',
+ values=[(19, 5005, 'Yes!')])
r = {'Prime!': 17}
r = delete(table, r)
self.assertEqual(r, 0)
@@ -2254,9 +2266,9 @@
delete = self.db.delete
query = self.db.query
self.createTable('test_parent',
- 'n smallint primary key', values=range(3))
+ 'n smallint primary key', values=range(3))
self.createTable('test_child',
- 'n smallint primary key references test_parent',
values=range(3))
+ 'n smallint primary key references test_parent', values=range(3))
q = ("select (select count(*) from test_parent),"
" (select count(*) from test_child)")
self.assertEqual(query(q).getresult()[0], (3, 3))
@@ -2440,7 +2452,7 @@
r = query(q).getresult()[0]
self.assertEqual(r, (0, 0, 0, 0))
self.assertRaises(ValueError, truncate,
- ['test_parent*', 'test_child'], only=[True, False])
+ ['test_parent*', 'test_child'], only=[True, False])
truncate(['test_parent*', 'test_child'], only=[False, True])
def testTruncateQuoted(self):
@@ -2474,7 +2486,7 @@
names = [(1, 'Homer'), (2, 'Marge'),
(3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
self.createTable(table,
- 'id smallint primary key, name varchar', values=names)
+ 'id smallint primary key, name varchar', values=names)
r = get_as_list(table)
self.assertIsInstance(r, list)
self.assertEqual(r, names)
@@ -2503,7 +2515,7 @@
self.assertIsInstance(r, list)
self.assertEqual(r, [('Maggie',), ('Marge',)])
r = get_as_list(table, what='name',
- where=["name like 'Ma%'", "name like '%r%'"])
+ where=["name like 'Ma%'", "name like '%r%'"])
self.assertIsInstance(r, list)
self.assertEqual(r, [('Marge',)])
r = get_as_list(table, what='name', order='id')
@@ -2588,8 +2600,8 @@
colors = [(1, '#7cb9e8', 'Aero'), (2, '#b5a642', 'Brass'),
(3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
self.createTable(table,
- 'id smallint primary key, rgb char(7), name varchar',
- values=colors)
+ 'id smallint primary key, rgb char(7), name varchar',
+ values=colors)
# keyname must be string, list or tuple
self.assertRaises(KeyError, get_as_dict, table, 3)
self.assertRaises(KeyError, get_as_dict, table, dict(id=None))
@@ -2667,7 +2679,7 @@
r = get_as_dict(table, keyname='rgb', what=['rgb', 'name'],
scalar=True)
self.assertIsInstance(r, OrderedDict)
expected = OrderedDict((row[1], row[2])
- for row in sorted(colors, key=itemgetter(1)))
+ for row in sorted(colors, key=itemgetter(1)))
self.assertEqual(r, expected)
for key in r:
self.assertIsInstance(key, str)
@@ -2678,7 +2690,7 @@
if OrderedDict is not dict: # Python > 2.6
self.assertEqual(r.keys(), expected.keys())
r = get_as_dict(table, what='id, name',
- where="rgb like '#b%'", scalar=True)
+ where="rgb like '#b%'", scalar=True)
self.assertIsInstance(r, OrderedDict)
expected = OrderedDict((row[0], row[2]) for row in colors[1:3])
self.assertEqual(r, expected)
@@ -2692,13 +2704,13 @@
self.assertEqual(r.keys(), expected.keys())
expected = r
r = get_as_dict(table, what=['name', 'id'],
- where=['id > 1', 'id < 4', "rgb like '#b%'",
- "name not like 'A%'", "name not like '%t'"],
scalar=True)
+ where=['id > 1', 'id < 4', "rgb like '#b%'",
+ "name not like 'A%'", "name not like '%t'"], scalar=True)
self.assertEqual(r, expected)
r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
self.assertEqual(r, expected)
r = get_as_dict(table, keyname=('id',), what=('name', 'id'),
- where=('id > 1', 'id < 4'), order=('id',), scalar=True)
+ where=('id > 1', 'id < 4'), order=('id',), scalar=True)
self.assertEqual(r, expected)
r = get_as_dict(table, limit=1)
self.assertEqual(len(r), 1)
@@ -3044,9 +3056,9 @@
def testArray(self):
self.createTable('arraytest',
- 'id smallint, i2 smallint[], i4 integer[], i8
bigint[],'
- ' d numeric[], f4 real[], f8 double precision[], m
money[],'
- ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
+ 'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
+ ' d numeric[], f4 real[], f8 double precision[], m money[],'
+ ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
r = self.db.get_attnames('arraytest')
if self.regtypes:
self.assertEqual(r, dict(
@@ -3068,20 +3080,20 @@
odd_money = decimal('1234567123.25')
t, f = (True, False) if pg.get_bool() else ('t', 'f')
data = dict(id=42, i2=[42, 1234, None, 0, -1],
- i4=[42, 123456789, None, 0, 1, -1],
- i8=[long(42), long(123456789123456789), None,
- long(0), long(1), long(-1)],
- d=[decimal(42), long_decimal, None,
- decimal(0), decimal(1), decimal(-1), -long_decimal],
- f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
- float('inf'), float('-inf')],
- f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
- float('inf'), float('-inf')],
- m=[decimal('42.00'), odd_money, None,
- decimal('0.00'), decimal('1.00'), decimal('-1.00'),
-odd_money],
- b=[t, f, t, None, f, t, None, None, t],
- v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', ' ',
None],
- t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
+ i4=[42, 123456789, None, 0, 1, -1],
+ i8=[long(42), long(123456789123456789), None,
+ long(0), long(1), long(-1)],
+ d=[decimal(42), long_decimal, None,
+ decimal(0), decimal(1), decimal(-1), -long_decimal],
+ f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
+ float('inf'), float('-inf')],
+ f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
+ float('inf'), float('-inf')],
+ m=[decimal('42.00'), odd_money, None,
+ decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
+ b=[t, f, t, None, f, t, None, None, t],
+ v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', ' ', None],
+ t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
r = data.copy()
self.db.insert('arraytest', r)
self.assertEqual(r, data)
@@ -3102,7 +3114,7 @@
self.db.insert('arraytest', r)
self.assertEqual(r['i'], [1, 2, 3])
self.assertEqual(r['t'], ['a', 'b', 'c'])
- L = pg._Literal
+ L = pg.Literal
r = dict(i=L("ARRAY[1, 2, 3]"), t=L("ARRAY['a', 'b', 'c']"))
self.db.insert('arraytest', r)
self.assertEqual(r['i'], [1, 2, 3])
@@ -3270,12 +3282,12 @@
self.assertEqual(person_typ, 'record')
if self.regtypes:
self.assertEqual(person_typ.attnames,
- dict(name='character varying', age='smallint',
- married='boolean', weight='real',
salary='money'))
+ dict(name='character varying', age='smallint',
+ married='boolean', weight='real', salary='money'))
else:
self.assertEqual(person_typ.attnames,
- dict(name='text', age='int', married='bool',
- weight='float', salary='money'))
+ dict(name='text', age='int', married='bool',
+ weight='float', salary='money'))
decimal = pg.get_decimal()
if pg.get_bool():
bool_class = bool
@@ -3406,7 +3418,7 @@
else:
self.assertEqual(person_typ.attnames,
dict(name='text', age='int'))
- person = pg._Literal("('John Doe', 61)")
+ person = pg.Literal("('John Doe', 61)")
r = self.db.insert('test_person', None, person=person)
p = r['person']
self.assertIsInstance(p, tuple)
@@ -3460,8 +3472,18 @@
def testDbTypesTypecast(self):
dbtypes = self.db.dbtypes
self.assertIsInstance(dbtypes, dict)
+ self.assertNotIn('int4', dbtypes)
+ self.assertIs(dbtypes.get_typecast('int4'), int)
+ dbtypes.set_typecast('int4', float)
+ self.assertIs(dbtypes.get_typecast('int4'), float)
+ dbtypes.reset_typecast('int4')
+ self.assertIs(dbtypes.get_typecast('int4'), int)
+ dbtypes.set_typecast('int4', float)
+ self.assertIs(dbtypes.get_typecast('int4'), float)
+ dbtypes.reset_typecast()
+ self.assertIs(dbtypes.get_typecast('int4'), int)
self.assertNotIn('circle', dbtypes)
- cast_circle = dbtypes.get_typecast('circle')
+ self.assertIsNone(dbtypes.get_typecast('circle'))
squared_circle = lambda v: 'Squared Circle: %s' % v
dbtypes.set_typecast('circle', squared_circle)
self.assertIs(dbtypes.get_typecast('circle'), squared_circle)
@@ -3471,7 +3493,7 @@
self.assertEqual(dbtypes.typecast('Impossible', 'circle'),
'Squared Circle: Impossible')
dbtypes.reset_typecast('circle')
- self.assertIs(dbtypes.get_typecast('circle'), cast_circle)
+ self.assertIsNone(dbtypes.get_typecast('circle'))
def testGetSetTypeCast(self):
get_typecast = pg.get_typecast
@@ -3485,6 +3507,7 @@
self.assertIs(get_typecast('float4'), float)
self.assertIs(get_typecast('bool'), pg.cast_bool)
cast_circle = get_typecast('circle')
+ self.addCleanup(set_typecast, 'circle', cast_circle)
squared_circle = lambda v: 'Squared Circle: %s' % v
self.assertNotIn('circle', dbtypes)
set_typecast('circle', squared_circle)
@@ -3606,6 +3629,248 @@
return getattr(pg, 'set_' + option)(cls.saved_options[option])
+class TestDBClassAdapter(unittest.TestCase):
+ """Test the adapter object associatd with the DB class."""
+
+ def setUp(self):
+ self.db = DB()
+ self.adapter = self.db.adapter
+
+ def tearDown(self):
+ try:
+ self.db.close()
+ except pg.InternalError:
+ pass
+
+ def testGuessSimpleType(self):
+ f = self.adapter.guess_simple_type
+ self.assertEqual(f(pg.Bytea(b'test')), 'bytea')
+ self.assertEqual(f('string'), 'text')
+ self.assertEqual(f(b'string'), 'text')
+ self.assertEqual(f(True), 'bool')
+ self.assertEqual(f(3), 'int')
+ self.assertEqual(f(2.75), 'float')
+ self.assertEqual(f(Decimal('4.25')), 'num')
+ self.assertEqual(f(date(2016, 1, 30)), 'date')
+ self.assertEqual(f([1, 2, 3]), 'int[]')
+ self.assertEqual(f([[[123]]]), 'int[]')
+ self.assertEqual(f(['a', 'b', 'c']), 'text[]')
+ self.assertEqual(f([[['abc']]]), 'text[]')
+ self.assertEqual(f([False, True]), 'bool[]')
+ self.assertEqual(f([[[False]]]), 'bool[]')
+ r = f(('string', True, 3, 2.75, [1], [False]))
+ self.assertEqual(r, 'record')
+ self.assertEqual(list(r.attnames.values()),
+ ['text', 'bool', 'int', 'float', 'int[]', 'bool[]'])
+
+ def testAdaptQueryTypedList(self):
+ format_query = self.adapter.format_query
+ self.assertRaises(TypeError, format_query,
+ '%s,%s', (1, 2), ('int2',))
+ self.assertRaises(TypeError, format_query,
+ '%s,%s', (1,), ('int2', 'int2'))
+ values = (3, 7.5, 'hello', True)
+ types = ('int4', 'float4', 'text', 'bool')
+ sql, params = format_query("select %s,%s,%s,%s", values, types)
+ self.assertEqual(sql, 'select $1,$2,$3,$4')
+ self.assertEqual(params, [3, 7.5, 'hello', 't'])
+ types = ('bool', 'bool', 'bool', 'bool')
+ sql, params = format_query("select %s,%s,%s,%s", values, types)
+ self.assertEqual(sql, 'select $1,$2,$3,$4')
+ self.assertEqual(params, ['t', 't', 'f', 't'])
+ values = ('2016-01-30', 'current_date')
+ types = ('date', 'date')
+ sql, params = format_query("values(%s,%s)", values, types)
+ self.assertEqual(sql, 'values($1,current_date)')
+ self.assertEqual(params, ['2016-01-30'])
+ values = ([1, 2, 3], ['a', 'b', 'c'])
+ types = ('_int4', '_text')
+ sql, params = format_query("%s::int4[],%s::text[]", values, types)
+ self.assertEqual(sql, '$1::int4[],$2::text[]')
+ self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
+ types = ('_bool', '_bool')
+ sql, params = format_query("%s::bool[],%s::bool[]", values, types)
+ self.assertEqual(sql, '$1::bool[],$2::bool[]')
+ self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
+ values = [(3, 7.5, 'hello', True, [123], ['abc'])]
+ t = self.adapter.simple_type
+ typ = t('record')
+ typ._get_attnames = lambda _self: pg.AttrDict([
+ ('i', t('int')), ('f', t('float')),
+ ('t', t('text')), ('b', t('bool')),
+ ('i3', t('int[]')), ('t3', t('text[]'))])
+ types = [typ]
+ sql, params = format_query('select %s', values, types)
+ self.assertEqual(sql, 'select $1')
+ self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
+
+ def testAdaptQueryTypedDict(self):
+ format_query = self.adapter.format_query
+ self.assertRaises(TypeError, format_query,
+ '%s,%s', dict(i1=1, i2=2), dict(i1='int2'))
+ values = dict(i=3, f=7.5, t='hello', b=True)
+ types = dict(i='int4', f='float4',
+ t='text', b='bool')
+ sql, params = format_query(
+ "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
+ self.assertEqual(sql, 'select $3,$2,$4,$1')
+ self.assertEqual(params, ['t', 7.5, 3, 'hello'])
+ types = dict(i='bool', f='bool',
+ t='bool', b='bool')
+ sql, params = format_query(
+ "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
+ self.assertEqual(sql, 'select $3,$2,$4,$1')
+ self.assertEqual(params, ['t', 't', 't', 'f'])
+ values = dict(d1='2016-01-30', d2='current_date')
+ types = dict(d1='date', d2='date')
+ sql, params = format_query("values(%(d1)s,%(d2)s)", values, types)
+ self.assertEqual(sql, 'values($1,current_date)')
+ self.assertEqual(params, ['2016-01-30'])
+ values = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
+ types = dict(i='_int4', t='_text')
+ sql, params = format_query(
+ "%(i)s::int4[],%(t)s::text[]", values, types)
+ self.assertEqual(sql, '$1::int4[],$2::text[]')
+ self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
+ types = dict(i='_bool', t='_bool')
+ sql, params = format_query(
+ "%(i)s::bool[],%(t)s::bool[]", values, types)
+ self.assertEqual(sql, '$1::bool[],$2::bool[]')
+ self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
+ values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
+ t = self.adapter.simple_type
+ typ = t('record')
+ typ._get_attnames = lambda _self: pg.AttrDict([
+ ('i', t('int')), ('f', t('float')),
+ ('t', t('text')), ('b', t('bool')),
+ ('i3', t('int[]')), ('t3', t('text[]'))])
+ types = dict(record=typ)
+ sql, params = format_query('select %(record)s', values, types)
+ self.assertEqual(sql, 'select $1')
+ self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
+
+ def testAdaptQueryUntypedList(self):
+ format_query = self.adapter.format_query
+ values = (3, 7.5, 'hello', True)
+ sql, params = format_query("select %s,%s,%s,%s", values)
+ self.assertEqual(sql, 'select $1,$2,$3,$4')
+ self.assertEqual(params, [3, 7.5, 'hello', 't'])
+ values = [date(2016, 1, 30), 'current_date']
+ sql, params = format_query("values(%s,%s)", values)
+ self.assertEqual(sql, 'values($1,$2)')
+ self.assertEqual(params, values)
+ values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
+ sql, params = format_query("%s,%s,%s", values)
+ self.assertEqual(sql, "$1,$2,$3")
+ self.assertEqual(params, ['{1,2,3}', '{a,b,c}', '{t,f,t}'])
+ values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
+ [[True, False], [False, True]])
+ sql, params = format_query("%s,%s,%s", values)
+ self.assertEqual(sql, "$1,$2,$3")
+ self.assertEqual(params, [
+ '{{1,2},{3,4}}', '{{a,b},{c,d}}', '{{t,f},{f,t}}'])
+ values = [(3, 7.5, 'hello', True, [123], ['abc'])]
+ sql, params = format_query('select %s', values)
+ self.assertEqual(sql, 'select $1')
+ self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
+
+ def testAdaptQueryUntypedDict(self):
+ format_query = self.adapter.format_query
+ values = dict(i=3, f=7.5, t='hello', b=True)
+ sql, params = format_query(
+ "select %(i)s,%(f)s,%(t)s,%(b)s", values)
+ self.assertEqual(sql, 'select $3,$2,$4,$1')
+ self.assertEqual(params, ['t', 7.5, 3, 'hello'])
+ values = dict(d1='2016-01-30', d2='current_date')
+ sql, params = format_query("values(%(d1)s,%(d2)s)", values)
+ self.assertEqual(sql, 'values($1,$2)')
+ self.assertEqual(params, [values['d1'], values['d2']])
+ values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
+ sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
+ self.assertEqual(sql, "$2,$3,$1")
+ self.assertEqual(params, ['{t,f,t}', '{1,2,3}', '{a,b,c}'])
+ values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
+ b=[[True, False], [False, True]])
+ sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
+ self.assertEqual(sql, "$2,$3,$1")
+ self.assertEqual(params, [
+ '{{t,f},{f,t}}', '{{1,2},{3,4}}', '{{a,b},{c,d}}'])
+ values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
+ sql, params = format_query('select %(record)s', values)
+ self.assertEqual(sql, 'select $1')
+ self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
+
+ def testAdaptQueryInlineList(self):
+ format_query = self.adapter.format_query
+ values = (3, 7.5, 'hello', True)
+ sql, params = format_query("select %s,%s,%s,%s", values, inline=True)
+ self.assertEqual(sql, "select 3,7.5,'hello',true")
+ self.assertEqual(params, [])
+ values = [date(2016, 1, 30), 'current_date']
+ sql, params = format_query("values(%s,%s)", values, inline=True)
+ self.assertEqual(sql, "values('2016-01-30','current_date')")
+ self.assertEqual(params, [])
+ values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
+ sql, params = format_query("%s,%s,%s", values, inline=True)
+ self.assertEqual(sql,
+ "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
+ self.assertEqual(params, [])
+ values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
+ [[True, False], [False, True]])
+ sql, params = format_query("%s,%s,%s", values, inline=True)
+ self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
+ "ARRAY[[true,false],[false,true]]")
+ self.assertEqual(params, [])
+ values = [(3, 7.5, 'hello', True, [123], ['abc'])]
+ sql, params = format_query('select %s', values, inline=True)
+ self.assertEqual(sql,
+ "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
+ self.assertEqual(params, [])
+
+ def testAdaptQueryInlineDict(self):
+ format_query = self.adapter.format_query
+ values = dict(i=3, f=7.5, t='hello', b=True)
+ sql, params = format_query(
+ "select %(i)s,%(f)s,%(t)s,%(b)s", values, inline=True)
+ self.assertEqual(sql, "select 3,7.5,'hello',true")
+ self.assertEqual(params, [])
+ values = dict(d1='2016-01-30', d2='current_date')
+ sql, params = format_query(
+ "values(%(d1)s,%(d2)s)", values, inline=True)
+ self.assertEqual(sql, "values('2016-01-30','current_date')")
+ self.assertEqual(params, [])
+ values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
+ sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
+ self.assertEqual(sql,
+ "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
+ self.assertEqual(params, [])
+ values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
+ b=[[True, False], [False, True]])
+ sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
+ self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
+ "ARRAY[[true,false],[false,true]]")
+ self.assertEqual(params, [])
+ values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
+ sql, params = format_query('select %(record)s', values, inline=True)
+ self.assertEqual(sql,
+ "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
+ self.assertEqual(params, [])
+
+ def testAdaptQueryWithPgRepr(self):
+ format_query = self.adapter.format_query
+ self.assertRaises(TypeError, format_query,
+ '%s', object(), inline=True)
+ class TestObject:
+ def __pg_repr__(self):
+ return "'adapted'"
+ sql, params = format_query('select %s', [TestObject()], inline=True)
+ self.assertEqual(sql, "select 'adapted'")
+ self.assertEqual(params, [])
+ sql, params = format_query('select %s', [[TestObject()]], inline=True)
+ self.assertEqual(sql, "select ARRAY['adapted']")
+ self.assertEqual(params, [])
+
+
class TestSchemas(unittest.TestCase):
"""Test correct handling of schemas (namespaces)."""
Modified: trunk/tests/test_dbapi20.py
==============================================================================
--- trunk/tests/test_dbapi20.py Sat Jan 30 14:55:18 2016 (r798)
+++ trunk/tests/test_dbapi20.py Sun Jan 31 13:45:58 2016 (r799)
@@ -827,6 +827,18 @@
values[4] = values[6] = False
self.assertEqual(rows, values)
+ def test_literal(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ value = "lower('Hello')"
+ cur.execute("select %s, %s", (value, pgdb.Literal(value)))
+ row = cur.fetchone()
+ finally:
+ con.close()
+ self.assertEqual(row, (value, 'hello'))
+
+
def test_json(self):
inval = {"employees":
[{"firstName": "John", "lastName": "Doe", "age": 61}]}
_______________________________________________
PyGreSQL mailing list
[email protected]
https://mail.vex.net/mailman/listinfo.cgi/pygresql