Hi Michael,
Great project!
I want to use it, but ran into a showstopper situation with the lack of
support for a few datatypes.
So....
Below please find a patch that:
-- Adds support for SMALLINT, DATE and TIME as follow:
DB SMALLINT DATE TIME NOTE
------------ ---- ---- ----
------------------------------------
Postgresql X X X No Problems -- God, I love
this DB
MySQL X X X MySQL does not support
fractional
seconds
SQLite X X no support for TIME in pysqlite,
(tho the DB supports it fine)
Oracle X X Oracle does not support TIME
-- Adds support for a new --dburi parameter to test/testbase.py
-- Cleans up some dburi generation in testbase.py when --db is used
-- Adds conditional testing for microsecond and TIME datatypes to the
DateTest() unit test in test/types.py.
Support time the TIME datatype is determined by trying:
engine.type_descriptor(types.TIME).get_col_spec()
if that fails, the dbengine lacks TIME support, and the test is adjusted
accordingly.
Support for microseconds is currently done by looking at the name of the
engine module name -- this is a hack and there should be better support
for determining this -- perhaps in the engine.descriptor() method?
....
I ran regression test for Postgresql and SQLlite; they pass OK. I'm an
Oracle
newbie, and can't figure out how to get the dburi to work properly. MySQL
needs to be regression tested as well.
Thanks,
Rick
-----------------------------------
Index: test/types.py
===================================================================
--- test/types.py (revision 888)
+++ test/types.py (working copy)
@@ -5,8 +5,6 @@
db = testbase.db
-
-
class OverrideTest(PersistTest):
def testprocessing(self):
@@ -21,7 +19,8 @@
return typeobj()
def adapt_args(self):
return self
-
+
+ global users
users = Table('users', db,
Column('user_id', Integer, primary_key = True),
Column('goofy', MyType, nullable = False)
@@ -41,11 +40,16 @@
print repr(l)
self.assert_(l == [(2, u'BIND_INjackBIND_OUT'), (3,
u'BIND_INlalaBIND_OUT'), (4, u'BIND_INfredBIND_OUT')])
+ def tearDownAll(self):
+ global users
+ users.drop()
+
class ColumnsTest(AssertMixin):
def testcolumns(self):
expectedResults = { 'int_column': 'int_column INTEGER',
+ 'smallint_column': 'smallint_column SMALLINT',
'varchar_column': 'varchar_column
VARCHAR(20)',
'numeric_column': 'numeric_column
NUMERIC(12, 3)',
'float_column': 'float_column
NUMERIC(25, 2)'
@@ -57,6 +61,7 @@
print db.engine.__module__
testTable = Table('testColumns', db,
Column('int_column', Integer),
+ Column('smallint_column', Smallinteger),
Column('varchar_column', String(20)),
Column('numeric_column', Numeric(12,3)),
Column('float_column', Float(25)),
@@ -97,39 +102,59 @@
class DateTest(AssertMixin):
def setUpAll(self):
- global users_with_date
- users_with_date = Table('query_users_with_date', db,
- Column('user_id', INT, primary_key = True),
- Column('user_name', VARCHAR(20)),
- Column('user_date', DateTime),
- redefine = True
- )
+ global users_with_date, insert_data
+
+ insert_data = [[7, 'jack', datetime.datetime(2005, 11, 10, 0,
0), datetime.date(2005,11,10), datetime.time(12,20,2)],
+ [8, 'roy', datetime.datetime(2005, 11, 10, 11,
52, 35), datetime.date(2005,10,10), datetime.time(0,0,0)],
+ [9, 'foo', datetime.datetime(2005, 11, 10, 11,
52, 35, 54839), datetime.date(1970,4,1), datetime.time(23,59,59,999)],
+ [10, 'colber', None, None, None]]
+
+ fnames = ['user_id', 'user_name', 'user_datetime', 'user_date',
'user_time']
+
+ collist = [Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)), Column('user_datetime', DateTime),
+ Column('user_date', Date), Column('user_time', Time)]
+
+
+
+ if db.engine.__module__.endswith('mysql'):
+ # strip microseconds -- not supported by this engine
(should be an easier way to detect this)
+ for d in insert_data:
+ d[2] = d[2].replace(microsecond=0)
+ d[4] = d[4].replace(microsecond=0)
+
+ try:
+ db.type_descriptor(types.TIME).get_col_spec()
+ except:
+ # don't test TIME type -- not supported by this engine
+ insert_data = [d[:-1] for d in insert_data]
+ fnames = fnames[:-1]
+ collist = collist[:-1]
+
+
+ users_with_date = Table('query_users_with_date', db, redefine =
True, *collist)
users_with_date.create()
- users_with_date.insert().execute(user_id = 7, user_name =
'jack', user_date=datetime.datetime(2005,11,10))
- users_with_date.insert().execute(user_id = 8, user_name =
'roy', user_date=datetime.datetime(2005,11,10, 11,52,35))
- users_with_date.insert().execute(user_id = 9, user_name =
'foo', user_date=datetime.datetime(2005,11,10, 11,52,35, 54839))
- users_with_date.insert().execute(user_id = 10, user_name =
'colber', user_date=None)
+
+ insert_dicts = [dict(zip(fnames, d)) for d in insert_data]
+ for idict in insert_dicts:
+ users_with_date.insert().execute(**idict) # insert the data
+
def tearDownAll(self):
users_with_date.drop()
def testdate(self):
- l = users_with_date.select().execute().fetchall()
- l = [[c for c in r] for r in l]
- if db.engine.__module__.endswith('mysql'):
- x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)],
[8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo',
datetime.datetime(2005, 11, 10, 11, 52, 35)], [10, 'colber', None]]
- else:
- x = [[7, 'jack', datetime.datetime(2005, 11, 10, 0, 0)],
[8, 'roy', datetime.datetime(2005, 11, 10, 11, 52, 35)], [9, 'foo',
datetime.datetime(2005, 11, 10, 11, 52, 35, 54839)], [10, 'colber', None]]
- print repr(l)
- print repr(x)
- self.assert_(l == x)
+ global insert_data
+ l = map(list, users_with_date.select().execute().fetchall())
+ self.assert_(l == insert_data, 'DateTest mismatch: got:%s
expected:%s' % (l, insert_data))
+
+
def testtextdate(self):
- x = db.text("select user_date from query_users_with_date",
typemap={'user_date':DateTime}).execute().fetchall()
+ x = db.text("select user_datetime from query_users_with_date",
typemap={'user_datetime':DateTime}).execute().fetchall()
print repr(x)
self.assert_(isinstance(x[0][0], datetime.datetime))
- #x = db.text("select * from query_users_with_date where
user_date=:date", bindparams=[bindparam('date',
)]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
+ #x = db.text("select * from query_users_with_date where
user_datetime=:date", bindparams=[bindparam('date',
)]).execute(date=datetime.datetime(2005, 11, 10, 11, 52, 35)).fetchall()
#print repr(x)
if __name__ == "__main__":
Index: test/testbase.py
===================================================================
--- test/testbase.py (revision 888)
+++ test/testbase.py (working copy)
@@ -7,6 +7,7 @@
#import sqlalchemy.databases.mysql as mysql
echo = True
+echo = False
#echo = 'debug'
db = None
db_uri = None
@@ -14,31 +15,32 @@
def parse_argv():
# we are using the unittest main runner, so we are just popping out
the
# arguments we need instead of using our own getopt type of thing
+ global db, db_uri
+
+ DBTYPE = 'sqlite'
+
if len(sys.argv) >= 3:
- if sys.argv[1] == '--db':
+ if sys.argv[1] == '--dburi':
+ (param, db_uri) = (sys.argv.pop(1), sys.argv.pop(1))
+ elif sys.argv[1] == '--db':
(param, DBTYPE) = (sys.argv.pop(1), sys.argv.pop(1))
- else:
- DBTYPE = 'sqlite'
- global db, db_uri
- if DBTYPE == 'sqlite':
- try:
+ if (None == db_uri):
+ if DBTYPE == 'sqlite':
db_uri = 'sqlite://filename=:memory:'
- db = engine.create_engine(db_uri, echo=echo,
default_ordering=True)
- except:
- raise "Could not create sqlite engine. specify --db
<sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
- elif DBTYPE == 'sqlite_file':
- db_uri = 'sqlite://filename=querytest.db'
+ elif DBTYPE == 'sqlite_file':
+ db_uri = 'sqlite://filename=querytest.db'
+ elif DBTYPE == 'postgres':
+ db_uri =
'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
+ elif DBTYPE == 'mysql':
+ db_uri =
'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
+ elif DBTYPE == 'oracle':
+ db_uri = 'oracle://user=scott&password=tiger'
+ try:
db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'postgres':
- db_uri =
'postgres://database=test&port=5432&host=127.0.0.1&user=scott&password=tiger'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'mysql':
- db_uri = 'mysql://db=test&host=127.0.0.1&user=scott&passwd=tiger'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
- elif DBTYPE == 'oracle':
- db_uri = 'oracle://user=scott&password=tiger'
- db = engine.create_engine(db_uri, echo=echo, default_ordering=True)
+ except:
+ raise "Could not create engine. specify --db
<sqlite|sqlite_file|postgres|mysql|oracle> to test runner."
+
db = EngineAssert(db)
class PersistTest(unittest.TestCase):
Index: lib/sqlalchemy/databases/sqlite.py
===================================================================
--- lib/sqlalchemy/databases/sqlite.py (revision 888)
+++ lib/sqlalchemy/databases/sqlite.py (working copy)
@@ -15,6 +15,8 @@
from sqlalchemy.ansisql import *
import datetime,time
+pysqlite2_timesupport = False # Change this if the init.d guys ever
get around to supporting time cols
+
try:
from pysqlite2 import dbapi2 as sqlite
except:
@@ -26,10 +28,13 @@
class SLInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class SLSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class SLDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
- def convert_result_value(self, value, engine):
+ def _cvt(self, value, engine, fmt):
if value is None:
return None
parts = value.split('.')
@@ -38,9 +43,22 @@
microsecond = int(microsecond)
except ValueError:
(value, microsecond) = (value, 0)
- tup = time.strptime(value, "%Y-%m-%d %H:%M:%S")
- return datetime.datetime(microsecond=microsecond, *tup[0:6])
-
+ return time.strptime(value, fmt)[0:6] + (microsecond,)
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S")
+ return tup and datetime.datetime(*tup)
+class SLDate(SLDateTime):
+ def get_col_spec(self):
+ return "DATE"
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%Y-%m-%d")
+ return tup and datetime.date(*tup[0:3])
+class SLTime(SLDateTime):
+ def get_col_spec(self):
+ return "TIME"
+ def convert_result_value(self, value, engine):
+ tup = self._cvt(value, engine, "%H:%M:%S")
+ return tup and datetime.time(*tup[4:7])
class SLText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
@@ -59,9 +77,11 @@
colspecs = {
sqltypes.Integer : SLInteger,
+ sqltypes.Smallinteger : SLSmallInteger,
sqltypes.Numeric : SLNumeric,
sqltypes.Float : SLNumeric,
sqltypes.DateTime : SLDateTime,
+ sqltypes.Date : SLDate,
sqltypes.String : SLString,
sqltypes.Binary : SLBinary,
sqltypes.Boolean : SLBoolean,
@@ -71,15 +91,22 @@
pragma_names = {
'INTEGER' : SLInteger,
+ 'SMALLINT' : SLSmallInteger,
'VARCHAR' : SLString,
'CHAR' : SLChar,
'TEXT' : SLText,
'NUMERIC' : SLNumeric,
'FLOAT' : SLNumeric,
'TIMESTAMP' : SLDateTime,
+ 'DATETIME' : SLDateTime,
+ 'DATE' : SLDate,
'BLOB' : SLBinary,
}
+if pysqlite2_timesupport:
+ colspecs.update({sqltypes.Time : SLTime})
+ pragma_names.update({'TIME' : SLTime})
+
def engine(opts, **params):
return SQLiteSQLEngine(opts, **params)
Index: lib/sqlalchemy/databases/mysql.py
===================================================================
--- lib/sqlalchemy/databases/mysql.py (revision 888)
+++ lib/sqlalchemy/databases/mysql.py (working copy)
@@ -28,9 +28,18 @@
class MSInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class MSSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class MSDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "DATETIME"
+class MSDate(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+class MSTime(sqltypes.Time):
+ def get_col_spec(self):
+ return "TIME"
class MSText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
@@ -54,9 +63,12 @@
colspecs = {
sqltypes.Integer : MSInteger,
+ sqltypes.Smallinteger : MSSmallInteger,
sqltypes.Numeric : MSNumeric,
sqltypes.Float : MSFloat,
sqltypes.DateTime : MSDateTime,
+ sqltypes.Date : MSDate,
+ sqltypes.Time : MSTime,
sqltypes.String : MSString,
sqltypes.Binary : MSBinary,
sqltypes.Boolean : MSBoolean,
@@ -66,6 +78,7 @@
ischema_names = {
'int' : MSInteger,
+ 'smallint' : MSSmallInteger,
'varchar' : MSString,
'char' : MSChar,
'text' : MSText,
@@ -73,6 +86,8 @@
'float' : MSFloat,
'timestamp' : MSDateTime,
'datetime' : MSDateTime,
+ 'date' : MSDate,
+ 'time' : MSTime,
'binary' : MSBinary,
'blob' : MSBinary,
}
Index: lib/sqlalchemy/databases/oracle.py
===================================================================
--- lib/sqlalchemy/databases/oracle.py (revision 888)
+++ lib/sqlalchemy/databases/oracle.py (working copy)
@@ -24,9 +24,17 @@
class OracleInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class OracleSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class OracleDateTime(sqltypes.DateTime):
def get_col_spec(self):
return "DATE"
+# Note:
+# Oracle DATE == DATETIME
+# Oracle does not allow milliseconds in DATE
+# Oracle does not support TIME columns
+
class OracleText(sqltypes.TEXT):
def get_col_spec(self):
return "CLOB"
@@ -45,8 +53,10 @@
colspecs = {
sqltypes.Integer : OracleInteger,
+ sqltypes.Smallinteger : OracleSmallInteger,
sqltypes.Numeric : OracleNumeric,
sqltypes.DateTime : OracleDateTime,
+ sqltypes.Date : OracleDateTime,
sqltypes.String : OracleString,
sqltypes.Binary : OracleBinary,
sqltypes.Boolean : OracleBoolean,
Index: lib/sqlalchemy/databases/postgres.py
===================================================================
--- lib/sqlalchemy/databases/postgres.py (revision 888)
+++ lib/sqlalchemy/databases/postgres.py (working copy)
@@ -33,6 +33,9 @@
class PGInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+class PGSmallInteger(sqltypes.Smallinteger):
+ def get_col_spec(self):
+ return "SMALLINT"
class PG2DateTime(sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
@@ -46,6 +49,32 @@
return value
def get_col_spec(self):
return "TIMESTAMP"
+class PG2Date(sqltypes.Date):
+ def get_col_spec(self):
+ return "DATE"
+class PG1Date(sqltypes.Date):
+ def convert_bind_param(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python
DateTime/MXDateTime
+ # this one doesnt seem to work with the "emulation" mode
+ return psycopg.DateFromMx(value)
+ def convert_result_value(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python
DateTime/MXDateTime
+ return value
+ def get_col_spec(self):
+ return "DATE"
+class PG2Time(sqltypes.Date):
+ def get_col_spec(self):
+ return "TIME"
+class PG1Time(sqltypes.Date):
+ def convert_bind_param(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python
DateTime/MXDateTime
+ # this one doesnt seem to work with the "emulation" mode
+ return psycopg.TimeFromMx(value)
+ def convert_result_value(self, value, engine):
+ # TODO: perform appropriate postgres1 conversion between Python
DateTime/MXDateTime
+ return value
+ def get_col_spec(self):
+ return "TIME"
class PGText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
@@ -64,9 +93,12 @@
pg2_colspecs = {
sqltypes.Integer : PGInteger,
+ sqltypes.Smallinteger : PGSmallInteger,
sqltypes.Numeric : PGNumeric,
sqltypes.Float : PGFloat,
sqltypes.DateTime : PG2DateTime,
+ sqltypes.Date : PG2Date,
+ sqltypes.Time : PG2Time,
sqltypes.String : PGString,
sqltypes.Binary : PGBinary,
sqltypes.Boolean : PGBoolean,
@@ -74,11 +106,16 @@
sqltypes.CHAR: PGChar,
}
pg1_colspecs = pg2_colspecs.copy()
-pg1_colspecs[sqltypes.DateTime] = PG1DateTime
+pg1_colspecs.update({
+ sqltypes.DateTime : PG1DateTime,
+ sqltypes.Date : PG1Date,
+ sqltypes.Time : PG1Time
+ })
pg2_ischema_names = {
'integer' : PGInteger,
'bigint' : PGInteger,
+ 'smallint' : PGSmallInteger,
'character varying' : PGString,
'character' : PGChar,
'text' : PGText,
@@ -88,12 +125,18 @@
'double precision' : PGFloat,
'timestamp with time zone' : PG2DateTime,
'timestamp without time zone' : PG2DateTime,
+ 'date' : PG2Date,
+ 'time': PG2Time,
'bytea' : PGBinary,
'boolean' : PGBoolean,
}
pg1_ischema_names = pg2_ischema_names.copy()
-pg1_ischema_names['timestamp with time zone'] = \
- pg1_ischema_names['timestamp without time zone'] = PG1DateTime
+pg1_ischema_names.update({
+ 'timestamp with time zone' : PG1DateTime,
+ 'timestamp without time zone' : PG1DateTime,
+ 'date' : PG1Date,
+ 'time' : PG1Time
+ })
def engine(opts, **params):
return PGSQLEngine(opts, **params)
Index: lib/sqlalchemy/types.py
===================================================================
--- lib/sqlalchemy/types.py (revision 888)
+++ lib/sqlalchemy/types.py (working copy)
@@ -6,7 +6,8 @@
__all__ = [ 'TypeEngine', 'TypeDecorator', 'NullTypeEngine',
'INT', 'CHAR', 'VARCHAR', 'TEXT', 'FLOAT', 'DECIMAL',
- 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN',
'String', 'Integer', 'Numeric', 'Float', 'DateTime', 'Binary',
'Boolean', 'Unicode', 'NULLTYPE'
+ 'TIMESTAMP', 'DATETIME', 'CLOB', 'BLOB', 'BOOLEAN',
'String', 'Integer', 'Smallinteger',
+ 'Numeric', 'Float', 'DateTime', 'Date', 'Time', 'Binary',
'Boolean', 'Unicode', 'NULLTYPE'
]
import sqlalchemy.util as util
@@ -88,6 +89,10 @@
# seems to be not needed with SQLite, Postgres
pass
+class Smallinteger(Integer):
+ """ smallint datatype """
+ pass
+
class Numeric(NullTypeEngine):
def __init__(self, precision = 10, length = 2):
self.precision = precision
@@ -104,6 +109,12 @@
class DateTime(NullTypeEngine):
pass
+class Date(NullTypeEngine):
+ pass
+
+class Time(NullTypeEngine):
+ pass
+
class Binary(NullTypeEngine):
def __init__(self, length=None):
self.length = length
@@ -122,8 +133,11 @@
class DECIMAL(Numeric):pass
class INT(Integer):pass
INTEGER = INT
+class SMALLINT(Smallinteger):pass
class TIMESTAMP(DateTime): pass
class DATETIME(DateTime): pass
+class DATE(Date): pass
+class TIME(Time): pass
class CLOB(String): pass
class VARCHAR(String): pass
class CHAR(String):pass
-------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc. Do you grep through log files
for problems? Stop! Download the new AJAX search engine that makes
searching your log files as easy as surfing the web. DOWNLOAD SPLUNK!
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=103432&bid=230486&dat=121642
_______________________________________________
Sqlalchemy-users mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/sqlalchemy-users