Nice. In trunk!

On Thursday, 16 June 2016 16:54:20 UTC-5, Kyle Flanagan wrote:
>
> I needed the ability to extract a DAL model from SQL Server. I based the 
> script below on extract_pgsql_models by Mariano Reingart, based in turn on 
> a script to "generate schemas from dbs" (mysql) by Alexandre Andrade. I'm 
> posting it here in hopes that it will be useful for others.
>
> Tested on Windows 7 on a SQL Server 2012 database. Make sure to replace 
> "mssql4://..." with the appropriate connection string if using a legacy SQL 
> Server database. You might also need to modify the pyodbc.connect 
> connection string driver to match a driver on your local PC.
>
> """Create web2py model (python code) to represent MS SQL Server tables.
> Features:
> * Uses ANSI Standard INFORMATION_SCHEMA (might work with other RDBMS)
> * Detects legacy "keyed" tables (not having an "id" PK)
> * Handles 'funny' column names. web2py requires all column names be valid 
> python identifiers. This script uses rname
> *   for column names that have spaces or are otherwise invalid python 
> identifiers.
> * Connects directly to running databases, no need to do a SQL dump
> * Handles notnull, unique and referential constraints
> * Detects most common datatypes and default values
> * Supports running from the command line as well as from an IDE's debug menu. 
> See the COMMAND_LINE_MODE constant below
> *   for more info.
>
> Requirements:
> * Needs pyodbc python connector
>
> Created by Kyle Flanagan. Based on a script by Mariano Reingart which was
> based on a script to "generate schemas from dbs" (mysql) by Alexandre Andrade
> """
>
> _author__ = "Kyle Flanagan <[email protected]>"
>
> HELP = """
> USAGE: extract_mssql_models db host port user passwd
> Call with SQL Server database connection parameters,
> web2py model will be printed on standard output.
> EXAMPLE: python extract_mssql_models.py mydb localhost 3306 kflanaga pass
> or
> python extract_mssql_models.py mydb localhost 3306 kflanaga pass > db_model.py
> """
>
> # Config options
> DEBUG = False  # print debug messages to STDERR
> SCHEMA = 'dbo'
> COMMAND_LINE_MODE = True  # running from command prompt. Disable to specify 
> variables and use in IDE
> DB = None
> HOST = None
> USER = None
> PASSWD = None
> PORT = None
>
> # Constant for Field keyword parameter order (and filter):
> KWARGS = ('type', 'length', 'default', 'required', 'ondelete',
>           'notnull', 'unique', 'label', 'comment', 'rname')
>
> import sys
> import re
> # This is from pydal/helpers/regex.py as of 2016-06-16
> # Use this to recognize if a field name need to have an rname representation
> REGEX_VALID_TB_FLD = re.compile(r'^[^\d_][_0-9a-zA-Z]*\Z')
> # For replacing invalid characters in field names
> INVALID_CHARS = re.compile(r'[^a-zA-Z0-9_]')
>
>
> def get_valid_column_name(field):
>     """Return a valid column name that follows Python's rules for 
> identifiers, which is what web2py requires for column
>     names. Replaces invalid characters with underscores and leading digits 
> with their associated English word."""
>     if not REGEX_VALID_TB_FLD.match(field):
>         # If the first character is a digit, replace it with its word 
> counterpart
>         if re.match(r'^[0-9]', field):
>             numbers = ['Zero', 'One', 'Two', 'Three', 'Four',
>                        'Five', 'Six', 'Seven', 'Eight', 'Nine']
>             field = numbers[int(field[0])] + field[1:]
>
>         field = INVALID_CHARS.sub('_', field)
>     return field
>
>
> def query(conn, sql, *args):
>     "Execute a SQL query and return rows as a list of dicts"
>     cur = conn.cursor()
>     ret = []
>     try:
>         if DEBUG: print >> sys.stderr, "QUERY: ", sql % args
>         cur.execute(sql % args)
>         for row in cur:
>             dic = {}
>             for i, value in enumerate(row):
>                 field = cur.description[i][0]
>                 dic[field] = value
>             if DEBUG: print >> sys.stderr, "RET: ", dic
>             ret.append(dic)
>         return ret
>     finally:
>         cur.close()
>
>
> def get_tables(conn, schema=SCHEMA):
>     "List table names in a given schema"
>     rows = query(conn, """SELECT table_name FROM information_schema.tables
>         WHERE table_schema = '%s'
>         ORDER BY table_name""", schema)
>     return [row['table_name'] for row in rows]
>
>
> def get_fields(conn, table):
>     "Retrieve field list for a given table"
>     if DEBUG: print >> sys.stderr, "Processing TABLE", table
>     rows = query(conn, """
>         SELECT column_name, data_type,
>             is_nullable,
>             character_maximum_length,
>             numeric_precision, numeric_precision_radix, numeric_scale,
>             column_default
>         FROM information_schema.columns
>         WHERE table_name='%s'
>         ORDER BY ordinal_position""", table)
>     return rows
>
>
> def define_field(conn, table, field, pks):
>     "Determine field type, default value, references, etc."
>     f = {}
>     ref = references(conn, table, field['column_name'])
>     if ref:
>         f.update(ref)
>     elif field['column_default'] and \
>             field['column_default'].startswith("nextval") and \
>                     field['column_name'] in pks:
>         f['type'] = "'id'"
>     elif field['data_type'].startswith('character'):
>         f['type'] = "'string'"
>         if field['character_maximum_length']:
>             f['length'] = field['character_maximum_length']
>     elif field['data_type'] in ('text', 'ntext'):
>         f['type'] = "'text'"
>     elif field['data_type'] in ('boolean', 'bit'):
>         f['type'] = "'boolean'"
>     elif field['data_type'] in ('tinyint', 'smallint', 'bigint', 'int'):
>         f['type'] = "'integer'"
>     elif field['data_type'] in ('real', 'float'):
>         f['type'] = "'double'"
>     elif field['data_type'] in ('datetime', 'datetime2', 'smalldatetime'):
>         f['type'] = "'datetime'"
>     elif field['data_type'] in ('timestamp',):
>         f['type'] = "'datetime'"
>         f['default'] = "request.now"
>         f['update'] = "request.now"
>     elif field['data_type'] in ('date',):
>         f['type'] = "'date'"
>     elif field['data_type'] in ('time',):
>         f['type'] = "'time'"
>     elif field['data_type'] in ('numeric', 'money', 'smallmoney', 'decimal'):
>         f['type'] = "'decimal'"
>         f['precision'] = field['numeric_precision']
>         f['scale'] = field['numeric_scale'] or 0
>     elif field['data_type'] in ('binary', 'varbinary', 'image'):
>         f['type'] = "'blob'"
>     elif field['data_type'] in ('point', 'lseg', 'polygon', 'unknown', 
> 'USER-DEFINED', 'sql_variant'):
>         f['type'] = ""  # unsupported?
>     elif field['data_type'] in ('varchar', 'char', 'nchar', 'nvarchar', 
> 'uniqueidentifer'):
>         f['type'] = "'string'"
>     else:
>         raise RuntimeError("Data Type not supported: %s " % str(field))
>
>     try:
>         if field['column_default']:
>             if field['column_default'] == "now()":
>                 d = "request.now"
>             elif field['column_default'] == "true":
>                 d = "True"
>             elif field['column_default'] == "false":
>                 d = "False"
>             else:
>                 d = repr(eval(field['column_default']))
>             f['default'] = str(d)
>     except (ValueError, SyntaxError):
>         pass
>     except Exception, e:
>         raise RuntimeError("Default unsupported '%s'" % 
> field['column_default'])
>
>     if not field['is_nullable']:
>         f['notnull'] = "True"
>
>     # For field names that are not valid python identifiers, we need to add a 
> reference to their actual name
>     # in the back end database
>     if not REGEX_VALID_TB_FLD.match(field['column_name']):
>         f['rname'] = "'[%s]'" % field['column_name']
>
>     return f
>
>
> def is_unique(conn, table, field):
>     "Find unique columns (incomplete support)"
>     rows = query(conn, """
>         SELECT c.column_name
>         FROM information_schema.table_constraints t
>         INNER JOIN information_schema.constraint_column_usage c
>         ON (t.CONSTRAINT_CATALOG =    c.CONSTRAINT_CATALOG
>             AND t.CONSTRAINT_NAME =   c.CONSTRAINT_NAME
>             AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA
>             AND t.TABLE_CATALOG =     c.TABLE_CATALOG
>             AND t.TABLE_NAME =        c.TABLE_NAME
>             AND t.TABLE_SCHEMA =      c.TABLE_SCHEMA)
>         WHERE t.table_name='%s'
>           AND c.column_name='%s'
>           AND t.constraint_type='UNIQUE'
>         ;""", table, field['column_name'])
>     return rows and True or False
>
>
> def primarykeys(conn, table):
>     "Find primary keys"
>     rows = query(conn, """
>         SELECT c.column_name
>         FROM information_schema.table_constraints t
>         INNER JOIN information_schema.constraint_column_usage c
>                 ON (t.CONSTRAINT_CATALOG =    c.CONSTRAINT_CATALOG
>             AND t.CONSTRAINT_NAME =   c.CONSTRAINT_NAME
>             AND t.CONSTRAINT_SCHEMA = c.CONSTRAINT_SCHEMA
>             AND t.TABLE_CATALOG =     c.TABLE_CATALOG
>             AND t.TABLE_NAME =        c.TABLE_NAME
>             AND t.TABLE_SCHEMA =      c.TABLE_SCHEMA)
>         WHERE t.table_name='%s'
>           AND t.constraint_type='PRIMARY KEY'
>         ;""", table)
>     return [row['column_name'] for row in rows]
>
>
> def references(conn, table, field):
>     "Find a FK (fails if multiple)"
>     rows1 = query(conn, """
>         SELECT k.table_name, k.column_name, k.constraint_name,
>                r.update_rule, r.delete_rule, k.ordinal_position
>         FROM information_schema.key_column_usage k
>         INNER JOIN information_schema.referential_constraints r
>         ON (k.CONSTRAINT_CATALOG =    r.CONSTRAINT_CATALOG
>             AND k.CONSTRAINT_NAME =   r.CONSTRAINT_NAME
>             AND k.CONSTRAINT_SCHEMA = r.CONSTRAINT_SCHEMA)
>         INNER JOIN information_schema.table_constraints t
>         ON (r.CONSTRAINT_CATALOG =    t.CONSTRAINT_CATALOG
>             AND r.CONSTRAINT_NAME =   t.CONSTRAINT_NAME
>             AND r.CONSTRAINT_SCHEMA = t.CONSTRAINT_SCHEMA)
>
>         WHERE k.table_name='%s'
>           AND k.column_name='%s'
>           AND t.constraint_type='FOREIGN KEY'
>           ;""", table, field)
>     if len(rows1) == 1:
>         rows2 = query(conn, """
>             SELECT table_name, column_name, *
>             FROM information_schema.constraint_column_usage
>             WHERE constraint_name='%s'
>             """, rows1[0]['constraint_name'])
>         row = None
>         if len(rows2) > 1:
>             row = rows2[int(rows1[0]['ordinal_position']) - 1]
>             keyed = True
>         if len(rows2) == 1:
>             row = rows2[0]
>             keyed = False
>         if row:
>             if keyed:  # THIS IS BAD, DON'T MIX "id" and primarykey!!!
>                 ref = {'type': "'reference %s.%s'" % (row['table_name'],
>                                                       row['column_name'])}
>             else:
>                 ref = {'type': "'reference %s'" % (row['table_name'],)}
>             if rows1[0]['delete_rule'] != "NO ACTION":
>                 ref['ondelete'] = repr(rows1[0]['delete_rule'])
>             return ref
>         elif rows2:
>             raise RuntimeError("Unsupported foreign key reference: %s" %
>                                str(rows2))
>
>     elif rows1:
>         raise RuntimeError("Unsupported referential constraint: %s" %
>                            str(rows1))
>
>
> def define_table(conn, table):
>     "Output single table definition"
>     fields = get_fields(conn, table)
>     pks = primarykeys(conn, table)
>     print "db.define_table('%s'," % (table,)
>     for field in fields:
>         fname = field['column_name']
>         fdef = define_field(conn, table, field, pks)
>         if fname not in pks and is_unique(conn, table, field):
>             fdef['unique'] = "True"
>         if fdef['type'] == "'id'" and fname in pks:
>             pks.pop(pks.index(fname))
>         print "    Field('%s', %s)," % (get_valid_column_name(fname),
>                                         ', '.join(["%s=%s" % (k, fdef[k]) for 
> k in KWARGS
>                                                    if k in fdef and fdef[k]]))
>     if pks:
>         print "    primarykey=[%s]," % ", ".join(["'%s'" % pk for pk in pks])
>     print     "    migrate=migrate)"
>     print
>
>
> def define_db(conn, db, host, port, user, passwd):
>     "Output database definition (model)"
>     dal = 'db = DAL("mssql4://%s:%s@%s:%s/%s", pool_size=10, 
> decode_credentials=True)'
>     print dal % (
>         user.replace('@', '%40').replace(':', '%3A'), passwd.replace('@', 
> '%40').replace(':', '%3A'), host, port, db)
>     print
>     print "migrate = False"
>     print
>     for table in get_tables(conn):
>         define_table(conn, table)
>
>
> if __name__ == "__main__":
>     # Parse arguments from command line:
>     if len(sys.argv) < 6 and COMMAND_LINE_MODE:
>         print HELP
>     else:
>         # Parse arguments from command line:
>         if COMMAND_LINE_MODE:
>             db, host, port, user, passwd = sys.argv[1:6]
>         else:
>             db = DB
>             host = HOST
>             user = USER
>             passwd = PASSWD
>             port = PORT
>
>         # Make the database connection (change driver if required)
>         import pyodbc
>         # cnn = pyodbc.connect(database=db, host=host, port=port,
>         #                        user=user, password=passwd,
>         #                        )
>         cnn = pyodbc.connect(
>             r'DRIVER={{SQL Server Native Client 
> 11.0}};SERVER={server};PORT={port};DATABASE={db};UID={user};PWD={passwd}'.format(
>                 server=host, port=port, db=db, user=user, passwd=passwd)
>         )
>         # Start model code generation:
>         define_db(cnn, db, host, port, user, passwd)
>
>

-- 
Resources:
- http://web2py.com
- http://web2py.com/book (Documentation)
- http://github.com/web2py/web2py (Source code)
- https://code.google.com/p/web2py/issues/list (Report Issues)
--- 
You received this message because you are subscribed to the Google Groups 
"web2py-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to