Here is my firebird backend for review.

Along with coding style stuff and silly bugs, I'd like to know if:

the CAST problem is solved in an acceptable way, I have doubted
between the current @compile.when(MightCast) and multiple
@compile.when(datatype) functions.

The URI resolver has no magic to discover absolute/relative database paths.
For example: the database foobar.gdb in the root of the filesystem has
to be accesed with firebird://u:[EMAIL PROTECTED]//foobar.fdb, not
firebird://u:[EMAIL PROTECTED]/foobar.fdb
but firebird://u:[EMAIL PROTECTED]/foobar.fdb is good for whatever
foobar.fdb is the alias for (configured in firebird's aliases.conf).
Does someone know a good way to make this behave less weird?

test_execute_expression_datatypes and
test_execute_expression_multiple_values might be added as generic
tests, I suppose other backends can pass these easily.

backend.py should be: storm/databases/firebird.py
database_test.py should be: tests/databases/firebird.py
store_test.py should be: tests/store/firebird.py

now, fire away with complaints ;-)

-- 
Mvg
Môshe van der Sterre
http://www.moshe.nl/
http://www.coecu.nl/
#
# Copyright (c) 2007 Moshe van der Sterre <[EMAIL PROTECTED]>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
import sys
from datetime import datetime, date, time, timedelta

from storm.databases import dummy

from storm.exceptions import install_exceptions, DatabaseModuleError

from storm.database import Connection, Database, Result
from storm.expr import (Column, COLUMN, EXPR, Expr, Select, SELECT, Undef,
                        SQL, SQLRaw, SetExpr, TABLE, Union, Except, Intersect,
                        build_tables, compile, compile_select, State,
                        compile_set_expr, has_tables, And, Eq)
from storm.variables import IntVariable

try:
    import kinterbasdb
except ImportError:
    kinterbasdb = dummy

if kinterbasdb is not dummy:
    install_exceptions(kinterbasdb)
    # this fixes datetime and unicode behavior
    kinterbasdb.init(type_conv=200)

compile = compile.fork()

@compile.when(Select)
def compile_select_firebird(compile, state, select):
    """This compiles a SELECT statement for firebird,
       It does not use compile_select due to large differences"""
    tokens = ["SELECT "]
    # firebird uses "SELECT FIRST 10 SKIP 20"
    if select.limit is not Undef:
        tokens.append("FIRST %d " % select.limit)
    if select.offset is not Undef:
        tokens.append("SKIP %d" % select.offset)
    if select.distinct:
        tokens.append("DISTINCT ")
    state.push("auto_tables", [])
    state.push("context", COLUMN)
    # Parameters as columns do not work, unless casted
    if type(select.columns) in (list, tuple):
	select.columns = [ MightCast(value) for value in select.columns ]
    else:
        select.columns = MightCast(select.columns)
    tokens.append(compile(state, select.columns))
    tables_pos = len(tokens)
    parameters_pos = len(state.parameters)
    state.context = EXPR
    if select.where is not Undef:
        tokens.append(" WHERE ")
        tokens.append(compile(state, select.where, raw=True))
    if select.order_by is not Undef:
        tokens.append(" ORDER BY ")
        tokens.append(compile(state, select.order_by, raw=True))
    if select.group_by is not Undef:
        tokens.append(" GROUP BY ")
        tokens.append(compile(state, select.group_by, raw=True))
    if not has_tables(state, select):
        select.default_tables = "RDB$DATABASE"
    state.context = TABLE
    state.push("parameters", [])
    tokens.insert(tables_pos, " FROM ")
    tokens.insert(tables_pos+1, build_tables(compile, state, select.tables,
                                                 select.default_tables))
    parameters = state.parameters
    state.pop()
    state.parameters[parameters_pos:parameters_pos] = parameters
    state.pop()
    state.pop()
    return "".join(tokens)

class MightCast(object):
    def __init__(self, value):
        self.value = value

castfuncs = {
    str:("CAST(%s AS VARCHAR(%i))", len),
    unicode:("CAST(%s AS VARCHAR(%i) CHARACTER SET UTF8)", len),
    int:("CAST(%s AS INT)",),
    long:("CAST(%s AS INT_64)",),
    float:("CAST(%s AS DOUBLE)",),
    bool:("CAST(%s AS INT)",),
    datetime:("CAST(%s AS TIMESTAMP)",),
    date:("CAST(%s AS DATE)",),
    time:("CAST(%s AS TIME)",),
    timedelta:("CAST(%s AS TIMESTAMP)",)
}

# Firebird needs parameters to be cast when in a SELECT
@compile.when(MightCast)
def compile_mightcast(compile, state, expr):
    if type(expr.value) in castfuncs:
        cast_as = castfuncs[type(expr.value)]
        query = cast_as[0]
        if(len(cast_as)>1):
            query %= compile(state, expr.value), cast_as[1](expr.value)
        else:
            query %= compile(state, expr.value)
        return query
    return compile(state, expr.value)

class FirebirdConnection(Connection):
    # No custom FirebirdResult used,
    # because firebird does not support any kind of insert identity
    _result_factory = Result
    _compile = compile


class Firebird(Database):
    _connection_factory = FirebirdConnection

    def __init__(self, uri):
        if kinterbasdb is dummy:
            raise DatabaseModuleError("'kinterbasdb' module not found")
        self._connect_kwargs = {}
        if uri.database is not None:
            self._connect_kwargs["database"] = uri.database
        if uri.host is not None:
            self._connect_kwargs["hostname"] = uri.host
        if uri.username is not None:
            self._connect_kwargs["user"] = uri.username
        if uri.password is not None:
            self._connect_kwargs["password"] = uri.password

    def connect(self):
        raw_connection = kinterbasdb.connect(**self._connect_kwargs)
        return self._connection_factory(self, raw_connection)

create_from_uri = Firebird

#
# Copyright (c) 2007 Moshe van der Sterre <[EMAIL PROTECTED]>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

from datetime import datetime, date, time, timedelta
import os

from storm.uri import URI
from storm.database import create_database, Result
from storm.variables import UnicodeVariable, DateTimeVariable, TimeVariable

from storm.variables import ListVariable, IntVariable, Variable
from storm.expr import Union, Select, Alias, SQLRaw

from tests.databases.base import DatabaseTest, UnsupportedDatabaseTest
from tests.helper import TestHelper, MakePath


class FirebirdTest(DatabaseTest, TestHelper):

    def is_supported(self):
        return bool(os.environ.get("STORM_FIREBIRD_URI"))

    def create_database(self):
        self.database = create_database(os.environ["STORM_FIREBIRD_URI"])

    def create_tables(self):
	# "CHARACTER SET UTF8 COLLATE UNICODE" is needed for test_execute_unicode_result
        self.connection.execute("CREATE TABLE test (id INT UNIQUE, title VARCHAR(100) CHARACTER SET UTF8 COLLATE UNICODE)")
        self.connection.execute("CREATE TABLE datetime_test (id INT UNIQUE, dt TIMESTAMP, d DATE, t TIME)")
        self.connection.execute("CREATE TABLE bin_test (id INT UNIQUE, b BLOB)")
	self.connection.commit()

    # We override the following tests because
    # they have static SQL that doesn't work with firebird
    def test_execute_result(self):
        """The base test does a SELECT without a FROM"""
        result = self.connection.execute("SELECT 1 FROM RDB$DATABASE")
        self.assertTrue(isinstance(result, Result))
        self.assertTrue(result.get_one())

    def test_execute_params(self):
        """The base test does a SELECT without a FROM"""
        result = self.connection.execute("SELECT 1 FROM (SELECT 1 AS NUM FROM"
                                    " RDB$DATABASE) AS ALIAS WHERE 1=?", (1,))
        self.assertTrue(result.get_one())
        result = self.connection.execute("SELECT 1 FROM (SELECT 1 AS NUM FROM"
                                    " RDB$DATABASE) AS ALIAS WHERE 1=?", (2,))
        self.assertFalse(result.get_one())

    def test_execute_empty_params(self):
        """The base test does a SELECT without a FROM"""
        result = self.connection.execute("SELECT 1 FROM RDB$DATABASE", ())
        self.assertTrue(result.get_one())

    # We override the following tests because
    # firebird only has 100th millisecond precision
    def test_datetime(self):
        """Firebird does not have microsecond scale,
           it has 100th millisecond scale"""
        value = datetime(1977, 4, 5, 12, 34, 56, 7800)
        self.connection.execute("INSERT INTO datetime_test (dt) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT dt FROM datetime_test")
        variable = DateTimeVariable()
        result.set_variable(variable, result.get_one()[0])
        if not self.supports_microseconds:
            value = value.replace(microsecond=0)
        self.assertEquals(variable.get(), value)

    def test_time(self):
        """Firebird does not have microsecond scale,
           it has 100th millisecond scale"""
        value = time(12, 34, 56, 7800)
        self.connection.execute("INSERT INTO datetime_test (t) VALUES (?)",
                                (value,))
        result = self.connection.execute("SELECT t FROM datetime_test")
        variable = TimeVariable()
        result.set_variable(variable, result.get_one()[0])
        if not self.supports_microseconds:
            value = value.replace(microsecond=0)
        self.assertEquals(variable.get(), value)

    def test_get_insert_identity(self):
    """Firebird does not support insert identities in any way"""
	pass

    def test_get_insert_identity_composed(self):
    """Firebird does not support insert identities in any way"""
	pass

    # Additional tests
    def test_execute_expression_datatypes(self):
        """Test generated expressions for all datatypes"""
        values = (1, "1", u"1", time(12, 34, 56, 7800), datetime(1977, 4, 5, 12, 34, 56, 7800))
        for value in values:
            result = self.connection.execute(Select(value))
            self.assertEquals(result.get_one(), (value,))

    def test_execute_expression_multiple_values(self):
        """Test a generated expression with multiple values"""
	values = (1, "1", u"1", time(12, 34, 56, 7800), datetime(1977, 4, 5, 12, 34, 56, 7800))
        result = self.connection.execute(Select(values))
        self.assertEquals(result.get_one(), values)


class FirebirdUnsupportedTest(UnsupportedDatabaseTest, TestHelper):
    
    dbapi_module_name = "kinterbasdb"
    db_module_name = "firebird"
#
# Copyright (c) 2007 Moshe van der Sterre <[EMAIL PROTECTED]>
#
# This file is part of Storm Object Relational Mapper.
#
# Storm is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 2.1 of
# the License, or (at your option) any later version.
#
# Storm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
from storm.uri import URI

from tests.store.base import StoreTest, EmptyResultSetTest, Foo
from tests.helper import TestHelper, MakePath
from storm.database import Result, create_database
from storm.properties import Int
from storm.locals import SQL
import os
import gc

class FirebirdStoreTest(TestHelper, StoreTest):

    helpers = [MakePath]

    def setUp(self):
        TestHelper.setUp(self)
        StoreTest.setUp(self)

    def tearDown(self):
        TestHelper.tearDown(self)
        StoreTest.tearDown(self)

    def create_database(self):
        self.database = create_database(os.environ["STORM_FIREBIRD_URI"])

    def create_tables(self):
        connection = self.database.connect()
        connection.execute("CREATE TABLE foo (id INTEGER, "
                           "title VARCHAR(200) "
                           "CHARACTER SET UTF8 COLLATE UNICODE)")
        connection.execute("CREATE TABLE bar (id INTEGER, "
                           "foo_id INTEGER, "
                           "title VARCHAR(200) "
                           "CHARACTER SET UTF8 COLLATE UNICODE)")
        connection.execute("CREATE TABLE bin "
                           "(id INTEGER, bin BLOB)")
        connection.execute("CREATE TABLE link "
                           "(foo_id INTEGER, bar_id INTEGER)")
        connection.commit()

    def drop_tables(self):
        connection = self.database.connect()
	connection.commit()
        StoreTest.drop_tables(self)

    def test_execute(self):
        """The base test does a SELECT without a FROM"""
        result = self.store.execute("SELECT 1 FROM RDB$DATABASE")
        self.assertTrue(isinstance(result, Result))
        self.assertEquals(result.get_one(), (1,))

        result = self.store.execute("SELECT 1 FROM RDB$DATABASE",
                                                    noresult=True)
        self.assertEquals(result, None)

    def test_execute_params(self):
        """The base test does a SELECT without a FROM"""
        result = self.store.execute("SELECT CAST(? AS INT) FROM RDB$DATABASE",
                                                                          [1])
        self.assertTrue(isinstance(result, Result))
        self.assertEquals(result.get_one(), (1,))

    def test_add_and_stop_referencing(self):
        """Firebird does not support insert identities in any way,
           this test should be implemented with explicit sequences"""
        pass


class FirebirdEmptyResultSetTest(TestHelper, EmptyResultSetTest):

    helpers = [MakePath]

    def setUp(self):
        TestHelper.setUp(self)
        EmptyResultSetTest.setUp(self)

    def tearDown(self):
        TestHelper.tearDown(self)
        EmptyResultSetTest.tearDown(self)

    def create_database(self):
	self.database = create_database(os.environ["STORM_FIREBIRD_URI"])

    def create_tables(self):
        connection = self.database.connect()
        connection.execute("CREATE TABLE foo "
                           "(id INTEGER PRIMARY KEY,"
                           " title VARCHAR(200))")
        connection.commit()
-- 
storm mailing list
[email protected]
Modify settings or unsubscribe at: 
https://lists.ubuntu.com/mailman/listinfo/storm

Reply via email to