You're right, of course. Adding has_type() to OracleDialect is more a matter of
taste than a necessity.
Thanks again for all your help. I'm actually amazed at how well/transparent it
works, given the cx_Oracle limitations.
_____________________________
From: Mike Bayer <[email protected]>
Sent: Monday, October 3, 2016 5:54 PM
Subject: Re: [sqlalchemy] Feedback appreciated
To: <[email protected]>
On 10/03/2016 05:21 PM, Seth P wrote:
>
>
> On Friday, September 30, 2016 at 7:09:09 PM UTC-4, Mike Bayer wrote:
>
> the bind_expression() hook is here to allow you to re-render the
> expression. assuming value-bound bindparam() objects (e.g. not like
> you'd get with an INSERT or UPDATE usually), the value should be
> present
> and you can do this (had to work up a POC):
>
> from sqlalchemy import *
> from sqlalchemy.types import UserDefinedType
>
>
> class T(UserDefinedType):
>
> def bind_expression(self, colexpr):
> return literal_column(colexpr.value) # or whatever is
> needed here
>
> t = table('t', column('x', T()))
>
> print t.select().where(t.c.x == 'hi')
>
>
>
>
> >
> > Also, is there a way, inside VARRAY.__init__() or some other place
> that
> > is called before table creation to specify the
> sa.event.listen(<table>,
> > "before_create", self.create_ddl().execute_if(dialect='oracle'))?
>
>
> look into adding SchemaType as a mixin, it signals to the owning Column
> that it should receive events. You can then add the events to your
> type itself like before_parent_attach which should fire for the Column.
>
>
>
> OK, I got it working pretty much as desired.
>
> Adding
>
> def bind_expression(self, bindvalue):
> return sa.literal_column(self.process_literal_param(bindvalue.value,
> None), self)
>
> makes insert and update statements work.
>
> I also got the drop/create business working automatically by copying
> code from the Postgresql ENUM implementation, though it seems like an
> excessive amount of boilerplate. To keep the code as-is I had to
> monkey-patch OracleDialect to add a has_type() method -- any chance
> you'd want to add that for 1.1?
>
> import six
> import sqlalchemy as sa
>
>
> # Moneky-patch OracleDialect to have has_type() mehtod
> from sqlalchemy.dialects.oracle.base import OracleDialect
>
> def has_type(self, connection, type_name, schema=None):
> if not schema:
> schema = self.default_schema_name
> cursor = connection.execute(
> sa.sql.text("SELECT type_name FROM all_types "
> "WHERE type_name = :name AND owner = :schema_name"),
> name=self.denormalize_name(type_name),
> schema_name=self.denormalize_name(schema))
> return cursor.first() is not None
>
> OracleDialect.has_type = has_type
>
I can see that a full implementation in SQLA would benefit from an
OracleDialect.has_type() method but I don't see why, in the case here of
external implementation, why has_type() has to be on the OracleDialect?
You've got the bind right there and you're calling other SQL on it
inline within your type.
>
> class VARRAY(sa.types.UserDefinedType, sa.types.SchemaType):
>
> def __init__(self, name, size_limit, item_type, nullable=True,
> as_tuple=False,
> inherit_schema=True, create_type=True, **kw):
> sa.types.UserDefinedType.__init__(self)
> sa.types.SchemaType.__init__(self, name=name,
> inherit_schema=inherit_schema, **kw)
> self.size_limit = size_limit
> self.item_type = item_type
> self.nullable = nullable
> self.as_tuple = as_tuple
> self.create_type = create_type
>
> def get_col_spec(self):
> return (self.schema + '.' + self.name) if self.schema else self.name
>
> def compile(self, dialect=None):
> return (self.schema + '.' + self.name) if self.schema else self.name
>
> def create(self, bind=None, checkfirst=False):
> if not checkfirst or \
> not bind.dialect.has_type(
> bind, self.name, schema=self.schema):
> sql = "CREATE TYPE {} AS VARRAY({}) OF
> {}".format(self.compile(dialect=bind.dialect),
> self.size_limit,
>
> self.item_type.compile(dialect=bind.dialect))
> if not self.nullable:
> sql += " NOT NULL"
> bind.execute(sql)
>
> def drop(self, bind=None, checkfirst=False):
> if not checkfirst or \
> bind.dialect.has_type(bind, self.name, schema=self.schema):
> bind.execute("DROP TYPE " + self.compile(dialect=bind.dialect))
>
> def _check_for_name_in_memos(self, checkfirst, kw):
> """Look in the 'ddl runner' for 'memos', then
> note our name in that collection.
>
> This to ensure a particular named enum is operated
> upon only once within any kind of create/drop
> sequence without relying upon "checkfirst".
> """
> if not self.create_type:
> return True
> if '_ddl_runner' in kw:
> ddl_runner = kw['_ddl_runner']
> if '_oc_varrays' in ddl_runner.memo:
> pg_enums = ddl_runner.memo['_oc_varrays']
> else:
> pg_enums = ddl_runner.memo['_oc_varrays'] = set()
> present = self.name in pg_enums
> pg_enums.add(self.name)
> return present
> else:
> return False
>
> def _on_table_create(self, target, bind, checkfirst, **kw):
> if checkfirst or (
> not self.metadata and
> not kw.get('_is_metadata_operation', False)) and \
> not self._check_for_name_in_memos(checkfirst, kw):
> self.create(bind=bind, checkfirst=checkfirst)
>
> def _on_table_drop(self, target, bind, checkfirst, **kw):
> if not self.metadata and \
> not kw.get('_is_metadata_operation', False) and \
> not self._check_for_name_in_memos(checkfirst, kw):
> self.drop(bind=bind, checkfirst=checkfirst)
>
> def _on_metadata_create(self, target, bind, checkfirst, **kw):
> if not self._check_for_name_in_memos(checkfirst, kw):
> self.create(bind=bind, checkfirst=checkfirst)
>
> def _on_metadata_drop(self, target, bind, checkfirst, **kw):
> if not self._check_for_name_in_memos(checkfirst, kw):
> self.drop(bind=bind, checkfirst=checkfirst)
>
> def process_literal_param(self, value, dialect):
> return "{}({})".format(self.compile(dialect=dialect),
> ','.join("NULL" if x is None else
> ("'%s'" % x) if isinstance(x, six.string_types) else str(x)
> for x in value))
>
> def literal_processor(self, dialect):
> def processor(value):
> return self.process_literal_param(value, dialect)
> return processor
>
> def bind_expression(self, bindvalue):
> return sa.literal_column(self.process_literal_param(bindvalue.value,
> None), self)
>
> def process_result_value(self, value, dialect):
> if self.as_tuple:
> value = tuple(value)
> return value
>
> def result_processor(self, dialect, coltype):
> def processor(value):
> return self.process_result_value(value, dialect)
> return processor
>
> def adapt(self, impltype, **kw):
> return sa.types.SchemaType.adapt(self, impltype,
> size_limit=self.size_limit,
> item_type=self.item_type,
> nullable=self.nullable,
> as_tuple=self.as_tuple,
> **kw)
>
>
> if __name__ == '__main__':
>
> uri = "oracle://user:password@host"
>
> import alchy
> import sqlalchemy.dialects.oracle as oc
>
> db = alchy.Manager(config={'SQLALCHEMY_DATABASE_URI': uri,
> 'SQLALCHEMY_ECHO': True})
>
> class TestVarray(db.Model):
> __tablename__ = 'test_varray'
> __table_args__ = { 'schema': 'barra' }
> idx = sa.Column(sa.Integer, primary_key=True)
> label = sa.Column(sa.String(20), nullable=False)
> words = sa.Column(VARRAY("tp_test_varray_words", 3000, sa.String(8),
> nullable=True, schema='barra'), nullable=False)
> numbers = sa.Column(VARRAY("tp_test_varray_numbers", 3000,
> oc.NUMBER(), nullable=True, inherit_schema=True), nullable=False)
>
> db.drop_all()
> db.create_all()
>
> db.engine.execute(TestVarray.__table__.insert({'idx': 1,
> 'label': 'One',
> 'words': ['Once', 'upon',
> 'a', 'time'],
> 'numbers': [1.1, 1.2]}).
> compile(compile_kwargs={"literal_binds": True}))
> db.engine.execute(TestVarray.__table__.insert({'idx': 2,
> 'label': 'Two',
> 'words': ['To', 'be',
> 'or', 'not'],
> 'numbers': [2.1, 2.2]}))
> db.engine.execute(TestVarray.__table__.update().
> where(TestVarray.__table__.c.idx == 1).
> values(numbers=[1.1111, 1.2222]))
>
> print TestVarray.query.all()
> print db.session().query(TestVarray.label, TestVarray.words,
> TestVarray.numbers).all()
>
> [<TestVarray(idx=1, label='One', words=['Once', 'upon', 'a', 'time'],
> numbers=[1.1111, 1.2222])>,
> <TestVarray(idx=2, label='Two', words=['To', 'be', 'or', 'not'],
> numbers=[2.1, 2.2])>]
> [('One', ['Once', 'upon', 'a', 'time'], [1.1111, 1.2222]),
> ('Two', ['To', 'be', 'or', 'not'], [2.1, 2.2])]
>
>
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to [email protected]
> <mailto:[email protected]>.
> To post to this group, send email to [email protected]
> <mailto:[email protected]>.
> Visit this group at https://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google
Groups "sqlalchemy" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/sqlalchemy/BGWLUjaFtpY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
[email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.