Nice. In trunk! On Thursday, 16 June 2016 16:54:20 UTC-5, Kyle Flanagan wrote: > > I needed the ability to extract a DAL model from SQL Server. I based the > script below on extract_pgsql_models by Mariano Reingart, based in turn on > a script to "generate schemas from dbs" (mysql) by Alexandre Andrade. I'm > posting it here in hopes that it will be useful for others. > > Tested on Windows 7 on a SQL Server 2012 database. Make sure to replace > "mssql4://..." with the appropriate connection string if using a legacy SQL > Server database. You might also need to modify the pyodbc.connect > connection string driver to match a driver on your local PC. > > """Create web2py model (python code) to represent MS SQL Server tables. > Features: > * Uses ANSI Standard INFORMATION_SCHEMA (might work with other RDBMS) > * Detects legacy "keyed" tables (not having an "id" PK) > * Handles 'funny' column names. web2py requires all column names be valid > python identifiers. This script uses rname > * for column names that have spaces or are otherwise invalid python > identifiers. > * Connects directly to running databases, no need to do a SQL dump > * Handles notnull, unique and referential constraints > * Detects most common datatypes and default values > * Supports running from the command line as well as from an IDE's debug menu. > See the COMMAND_LINE_MODE constant below > * for more info. > > Requirements: > * Needs pyodbc python connector > > Created by Kyle Flanagan. Based on a script by Mariano Reingart which was > based on a script to "generate schemas from dbs" (mysql) by Alexandre Andrade > """ > > _author__ = "Kyle Flanagan <[email protected]>" > > HELP = """ > USAGE: extract_mssql_models db host port user passwd > Call with SQL Server database connection parameters, > web2py model will be printed on standard output. > EXAMPLE: python extract_mssql_models.py mydb localhost 3306 kflanaga pass > or > python extract_mssql_models.py mydb localhost 3306 kflanaga pass > db_model.py > """ > > # Config options > DEBUG = False # print debug messages to STDERR > SCHEMA = 'dbo' > COMMAND_LINE_MODE = True # running from command prompt. Disable to specify > variables and use in IDE > DB = None > HOST = None > USER = None > PASSWD = None > PORT = None > > # Constant for Field keyword parameter order (and filter): > KWARGS = ('type', 'length', 'default', 'required', 'ondelete', > 'notnull', 'unique', 'label', 'comment', 'rname') > > import sys > import re > # This is from pydal/helpers/regex.py as of 2016-06-16 > # Use this to recognize if a field name need to have an rname representation > REGEX_VALID_TB_FLD = re.compile(r'^[^\d_][_0-9a-zA-Z]*\Z') > # For replacing invalid characters in field names > INVALID_CHARS = re.compile(r'[^a-zA-Z0-9_]') > > > def get_valid_column_name(field): > """Return a valid column name that follows Python's rules for > identifiers, which is what web2py requires for column > names. Replaces invalid characters with underscores and leading digits > with their associated English word.""" > if not REGEX_VALID_TB_FLD.match(field): > # If the first character is a digit, replace it with its word > counterpart > if re.match(r'^[0-9]', field): > numbers = ['Zero', 'One', 'Two', 'Three', 'Four', > 'Five', 'Six', 'Seven', 'Eight', 'Nine'] > field = numbers[int(field[0])] + field[1:] > > field = INVALID_CHARS.sub('_', field) > return field > > > def query(conn, sql, *args): > "Execute a SQL query and return rows as a list of dicts" > cur = conn.cursor() > ret = [] > try: > if DEBUG: print >> sys.stderr, "QUERY: ", sql % args > cur.execute(sql % args) > for row in cur: > dic = {} > for i, value in enumerate(row): > field = cur.description[i][0] > dic[field] = value > if DEBUG: print >> sys.stderr, "RET: ", dic > ret.append(dic) > return ret > finally: > cur.close() > > > def get_tables(conn, schema=SCHEMA): > "List table names in a given schema" > rows = query(conn, """SELECT table_name FROM information_schema.tables > WHERE table_schema = '%s' > ORDER BY table_name""", schema) > return [row['table_name'] for row in rows] > > > def get_fields(conn, table): > "Retrieve field list for a given table" > if DEBUG: print >> sys.stderr, "Processing TABLE", table > rows = query(conn, """ > SELECT column_name, data_type, > is_nullable, > character_maximum_length, > numeric_precision, numeric_precision_radix, numeric_scale, > column_default > FROM information_schema.columns > WHERE table_name='%s' > ORDER BY ordinal_position""", table) > return rows > > > def define_field(conn, table, field, pks): > "Determine field type, default value, references, etc." > f = {} > ref = references(conn, table, field['column_name']) > if ref: > f.update(ref) > elif field['column_default'] and \ > field['column_default'].startswith("nextval") and \ > field['column_name'] in pks: > f['type'] = "'id'" > elif field['data_type'].startswith('character'): > f['type'] = "'string'" > if field['character_maximum_length']: > f['length'] = field['character_maximum_length'] > elif field['data_type'] in ('text', 'ntext'): > f['type'] = "'text'" > elif field['data_type'] in ('boolean', 'bit'): > f['type'] = "'boolean'" > elif field['data_type'] in ('tinyint', 'smallint', 'bigint', 'int'): > f['type'] = "'integer'" > elif field['data_type'] in ('real', 'float'): > f['type'] = "'double'" > elif field['data_type'] in ('datetime', 'datetime2', 'smalldatetime'): > f['type'] = "'datetime'" > elif field['data_type'] in ('timestamp',): > f['type'] = "'datetime'" > f['default'] = "request.now" > f['update'] = "request.now" > elif field['data_type'] in ('date',): > f['type'] = "'date'" > elif field['data_type'] in ('time',): > f['type'] = "'time'" > elif field['data_type'] in ('numeric', 'money', 'smallmoney', 'decimal'): > f['type'] = "'decimal'" > f['precision'] = field['numeric_precision'] > f['scale'] = field['numeric_scale'] or 0 > elif field['data_type'] in ('binary', 'varbinary', 'image'): > f['type'] = "'blob'" > elif field['data_type'] in ('point', 'lseg', 'polygon', 'unknown', > 'USER-DEFINED', 'sql_variant'): > f['type'] = "" # unsupported? > elif field['data_type'] in ('varchar', 'char', 'nchar', 'nvarchar', > 'uniqueidentifer'): > f['type'] = "'string'" > else: > raise RuntimeError("Data Type not supported: %s " % str(field)) > > try: > if field['column_default']: > if field['column_default'] == "now()": > d = "request.now" > elif field['column_default'] == "true": > d = "True" > elif field['column_default'] == "false": > d = "False" > else: > d = repr(eval(field['column_default'])) > f['default'] = str(d) > except (ValueError, SyntaxError): > pass > except Exception, e: > raise RuntimeError("Default unsupported '%s'" % > field['column_default']) > > if not field['is_nullable']: > f['notnull'] = "True" > > # For field names that are not valid python identifiers, we need to add a > reference to their actual name > # in the back end database > if not REGEX_VALID_TB_FLD.match(field['column_name']): > f['rname'] = "'[%s]'" % field['column_name'] > > return f > > > def is_unique(conn, table, field): > "Find unique columns (incomplete support)" > rows = query(conn, """ > SELECT c.column_name > FROM information_schema.table_constraints t > INNER JOIN information_schema.constraint_column_usage c > ON (t.CONSTRAINT_CATALOG = c.CONSTRAINT_CATALOG > AND t.CONSTRAINT_NAME = c.CONSTRAINT_NAME > AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA > AND t.TABLE_CATALOG = c.TABLE_CATALOG > AND t.TABLE_NAME = c.TABLE_NAME > AND t.TABLE_SCHEMA = c.TABLE_SCHEMA) > WHERE t.table_name='%s' > AND c.column_name='%s' > AND t.constraint_type='UNIQUE' > ;""", table, field['column_name']) > return rows and True or False > > > def primarykeys(conn, table): > "Find primary keys" > rows = query(conn, """ > SELECT c.column_name > FROM information_schema.table_constraints t > INNER JOIN information_schema.constraint_column_usage c > ON (t.CONSTRAINT_CATALOG = c.CONSTRAINT_CATALOG > AND t.CONSTRAINT_NAME = c.CONSTRAINT_NAME > AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA > AND t.TABLE_CATALOG = c.TABLE_CATALOG > AND t.TABLE_NAME = c.TABLE_NAME > AND t.TABLE_SCHEMA = c.TABLE_SCHEMA) > WHERE t.table_name='%s' > AND t.constraint_type='PRIMARY KEY' > ;""", table) > return [row['column_name'] for row in rows] > > > def references(conn, table, field): > "Find a FK (fails if multiple)" > rows1 = query(conn, """ > SELECT k.table_name, k.column_name, k.constraint_name, > r.update_rule, r.delete_rule, k.ordinal_position > FROM information_schema.key_column_usage k > INNER JOIN information_schema.referential_constraints r > ON (k.CONSTRAINT_CATALOG = r.CONSTRAINT_CATALOG > AND k.CONSTRAINT_NAME = r.CONSTRAINT_NAME > AND k.CONSTRAINT_SCHEMA = r.CONSTRAINT_SCHEMA) > INNER JOIN information_schema.table_constraints t > ON (r.CONSTRAINT_CATALOG = t.CONSTRAINT_CATALOG > AND r.CONSTRAINT_NAME = t.CONSTRAINT_NAME > AND r.CONSTRAINT_SCHEMA = t.CONSTRAINT_SCHEMA) > > WHERE k.table_name='%s' > AND k.column_name='%s' > AND t.constraint_type='FOREIGN KEY' > ;""", table, field) > if len(rows1) == 1: > rows2 = query(conn, """ > SELECT table_name, column_name, * > FROM information_schema.constraint_column_usage > WHERE constraint_name='%s' > """, rows1[0]['constraint_name']) > row = None > if len(rows2) > 1: > row = rows2[int(rows1[0]['ordinal_position']) - 1] > keyed = True > if len(rows2) == 1: > row = rows2[0] > keyed = False > if row: > if keyed: # THIS IS BAD, DON'T MIX "id" and primarykey!!! > ref = {'type': "'reference %s.%s'" % (row['table_name'], > row['column_name'])} > else: > ref = {'type': "'reference %s'" % (row['table_name'],)} > if rows1[0]['delete_rule'] != "NO ACTION": > ref['ondelete'] = repr(rows1[0]['delete_rule']) > return ref > elif rows2: > raise RuntimeError("Unsupported foreign key reference: %s" % > str(rows2)) > > elif rows1: > raise RuntimeError("Unsupported referential constraint: %s" % > str(rows1)) > > > def define_table(conn, table): > "Output single table definition" > fields = get_fields(conn, table) > pks = primarykeys(conn, table) > print "db.define_table('%s'," % (table,) > for field in fields: > fname = field['column_name'] > fdef = define_field(conn, table, field, pks) > if fname not in pks and is_unique(conn, table, field): > fdef['unique'] = "True" > if fdef['type'] == "'id'" and fname in pks: > pks.pop(pks.index(fname)) > print " Field('%s', %s)," % (get_valid_column_name(fname), > ', '.join(["%s=%s" % (k, fdef[k]) for > k in KWARGS > if k in fdef and fdef[k]])) > if pks: > print " primarykey=[%s]," % ", ".join(["'%s'" % pk for pk in pks]) > print " migrate=migrate)" > print > > > def define_db(conn, db, host, port, user, passwd): > "Output database definition (model)" > dal = 'db = DAL("mssql4://%s:%s@%s:%s/%s", pool_size=10, > decode_credentials=True)' > print dal % ( > user.replace('@', '%40').replace(':', '%3A'), passwd.replace('@', > '%40').replace(':', '%3A'), host, port, db) > print > print "migrate = False" > print > for table in get_tables(conn): > define_table(conn, table) > > > if __name__ == "__main__": > # Parse arguments from command line: > if len(sys.argv) < 6 and COMMAND_LINE_MODE: > print HELP > else: > # Parse arguments from command line: > if COMMAND_LINE_MODE: > db, host, port, user, passwd = sys.argv[1:6] > else: > db = DB > host = HOST > user = USER > passwd = PASSWD > port = PORT > > # Make the database connection (change driver if required) > import pyodbc > # cnn = pyodbc.connect(database=db, host=host, port=port, > # user=user, password=passwd, > # ) > cnn = pyodbc.connect( > r'DRIVER={{SQL Server Native Client > 11.0}};SERVER={server};PORT={port};DATABASE={db};UID={user};PWD={passwd}'.format( > server=host, port=port, db=db, user=user, passwd=passwd) > ) > # Start model code generation: > define_db(cnn, db, host, port, user, passwd) > >
-- Resources: - http://web2py.com - http://web2py.com/book (Documentation) - http://github.com/web2py/web2py (Source code) - https://code.google.com/p/web2py/issues/list (Report Issues) --- You received this message because you are subscribed to the Google Groups "web2py-users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.

