After little juggling with upper and lower case for table name, it fails just one test ERROR: testtoengine (__main__.EngineTest) because I don't have postgres installed.
Oh, and I've changed the func.current_date() into func.sysdate() to pass the testbasic. On 21/02/06, Michael Bayer <[EMAIL PROTECTED]> wrote: > excellent ! i knew someone would do it. adding this to my TODO. > you can try the unit test 'python test/engines.py --db oracle' to > give it a shot on your end. >
Index: lib/sqlalchemy/databases/oracle.py =================================================================== --- lib/sqlalchemy/databases/oracle.py (revision 1012) +++ lib/sqlalchemy/databases/oracle.py (working copy) @@ -65,6 +65,16 @@ sqltypes.CHAR: OracleChar, } + +ischema_names = { + 'VARCHAR2' : OracleString, + 'DATE' : OracleDateTime, + 'DATETIME' : OracleDateTime, + 'NUMBER' : OracleNumeric, + 'BLOB' : OracleBinary, + 'CLOB' : OracleText +} + def engine(*args, **params): return OracleSQLEngine(*args, **params) @@ -113,8 +123,60 @@ return OracleDefaultRunner(self, proxy) def reflecttable(self, table): - raise "not implemented" + #raise "not implemented" + c = self.execute ("select COLUMN_NAME, DATA_TYPE, DATA_LENGTH, DATA_PRECISION, DATA_SCALE, NULLABLE, DATA_DEFAULT from USER_TAB_COLUMNS where TABLE_NAME = :table_name", {'table_name':table.name.upper()}) + + while True: + row = c.fetchone() + if row is None: + break + (name, coltype, length, precision, scale, nullable, default) = (row[0], row[1], row[2], row[3], row[4], row[5]=='Y', row[6]) + + # INTEGER if the scale is 0 and precision is null + # NUMBER if the scale and precision are both null + # NUMBER(9,2) if the precision is 9 and the scale is 2 + # NUMBER(3) if the precision is 3 and scale is 0 + #length is ignored except for CHAR and VARCHAR2 + if coltype=='NUMBER' : + if precision is None and scale is None: + coltype = OracleNumeric + elif precision is None and scale == 0 : + coltype = OracleInteger + else : + coltype = OracleNumeric(precision, scale) + elif coltype=='CHAR' or coltype=='VARCHAR2': + coltype = ischema_names.get(coltype, OracleString)(length) + else: + coltype = ischema_names.get(coltype) + + colargs = [] + if default is not None: + colargs.append(PassiveDefault(sql.text(default, escape=False))) + + name = name.lower() + + table.append_item (schema.Column(name, coltype, nullable=nullable, *colargs)) + + + c = self.execute("""select UCC.CONSTRAINT_NAME, UCC.COLUMN_NAME, UC.CONSTRAINT_TYPE, UC.SEARCH_CONDITION, UC2.TABLE_NAME as REFERENCES_TABLE +from USER_CONS_COLUMNS UCC, USER_CONSTRAINTS UC, USER_CONSTRAINTS UC2 +where UCC.CONSTRAINT_NAME = UC.CONSTRAINT_NAME +and UC.R_CONSTRAINT_NAME = UC2.CONSTRAINT_NAME(+) +and UCC.TABLE_NAME = :table_name +order by UCC.CONSTRAINT_NAME""",{'table_name' : table.name.upper()}) + while True: + row = c.fetchone() + if row is None: + break + + (cons_name, column_name, type, search, referred_table) = row + if type=='P' : + table.c[column_name.lower()]._set_primary_key() + elif type=='R': + remotetable = Table(referred_table.lower(), self, autoload = True) + table.c[column_name.lower()].append_item(schema.ForeignKey(remotetable.primary_key[0])) + def last_inserted_ids(self): return self.context.last_inserted_ids