would you do one more test using this gql.py?
#!/usr/bin/env python # -*- coding: utf-8 -*- """ This file is part of web2py Web Framework (Copyrighted, 2007) Developed by Massimo Di Pierro <[email protected]> and Robin B <[email protected]> License: GPL v2 """ __all__ = ['GQLDB', 'Field'] import re import sys import os import types import cPickle import datetime import thread import cStringIO import csv import copy import socket import gluon.validators as validators import gluon.sqlhtml as sqlhtml import gluon.sql from new import classobj from google.appengine.ext import db as gae from google.appengine.api.datastore_types import Key from google.appengine.ext.db.polymodel import PolyModel MAX_ITEMS = 1000 # GAE main limitation Row = gluon.sql.Row Rows = gluon.sql.Rows Reference = gluon.sql.Reference SQLCallableList = gluon.sql.SQLCallableList table_field = re.compile('[\w_]+\.[\w_]+') SQL_DIALECTS = {'google': { 'boolean': gae.BooleanProperty, 'string': gae.StringProperty, 'text': gae.TextProperty, 'password': gae.StringProperty, 'blob': gae.BlobProperty, 'upload': gae.StringProperty, 'integer': gae.IntegerProperty, 'double': gae.FloatProperty, 'date': gae.DateProperty, 'time': gae.TimeProperty, 'datetime': gae.DateTimeProperty, 'id': None, 'reference': gae.IntegerProperty, 'list:string': (lambda: gae.StringListProperty(default=None)), 'list:integer': (lambda: gae.ListProperty(int,default=None)), 'list:reference': (lambda: gae.ListProperty(int,default=None)), 'lower': None, 'upper': None, 'is null': 'IS NULL', 'is not null': 'IS NOT NULL', 'extract': None, 'left join': None, }} def cleanup(text): if re.compile('[^0-9a-zA-Z_]').findall(text): raise SyntaxError, \ 'only [0-9a-zA-Z_] allowed in table and field names, received %s' \ % text return text def assert_filter_fields(*fields): for field in fields: if isinstance(field, (Field, Expression)) \ and field.type in ['text', 'blob']: raise SyntaxError, \ 'AppEngine does not index by: %s' % field.type def dateobj_to_datetime(obj): # convert dates, times to datetimes for AppEngine if isinstance(obj, datetime.datetime): pass elif isinstance(obj, datetime.date): obj = datetime.date(obj.year, obj.month, obj.day) elif isinstance(obj, datetime.time): obj = datetime.datetime( 1970, 1, 1, obj.hour, obj.minute, obj.second, obj.microsecond) return obj class GQLDB(gluon.sql.SQLDB): """ an instance of this class represents a database connection Example:: db=GQLDB() db.define_table('tablename', Field('fieldname1'), Field('fieldname2')) """ def __init__(self): self._uri = 'gae' self._dbname = 'gql' self['_lastsql'] = '' self.tables = SQLCallableList() self._translator = SQL_DIALECTS['google'] self._db_codec = 'UTF-8' def define_table( self, tablename, *fields, **args ): # these two lines are experimental if not fields and tablename.count(':'): (tablename, fields) = autofields(self, tablename) # if this table extends a polymodel, inherit fields from polymodel if isinstance(args.get('polymodel',None),Table): fields=[args['polymodel']]+[field for field in fields] tablename = cleanup(tablename) if tablename in dir(self) or tablename[0] == '_': raise SyntaxError, 'invalid table name: %s' % tablename if tablename in self.tables: raise SyntaxError, 'table already defined: %s' % tablename t = self[tablename] = Table(self, tablename, *fields) self.tables.append(tablename) t._create_references() t._create(polymodel=args.get('polymodel',None)) t._format = args.get('format', None) return t def __call__(self, where=''): if not where: where = '' return Set(self, where) def commit(self): pass def rollback(self): pass class SQLALL(object): def __init__(self, table): self.table = table class Table(gluon.sql.Table): """ an instance of this class represents a database table Example: db=GQLDB() db.define_table('users', Field('name')) db.users.insert(name='me') # print db.users._insert(...) to see SQL db.users.drop() """ def __init__( self, db, tablename, *fields ): new_fields = [ Field('id', 'id') ] for field in fields: if hasattr(field,'_db'): field = copy.copy(field) if isinstance(field, gluon.sql.Field): d=field.__dict__ field=Field('tmp') field.__dict__.update(d) if isinstance(field, Field): new_fields.append(field) elif isinstance(field, Table): new_fields += [copy.copy(field[f]) for f in field.fields if f != 'id'] else: raise SyntaxError, 'define_table argument \'%s\'is not a Field'%field fields = new_fields self._db = db self._tablename = tablename self.fields = SQLCallableList() self.virtualfields = [] fields = list(fields) # ## GAE Only, make sure uplodaded files go in datastore for field in fields: if isinstance(field, Field) and field.type == 'upload'\ and field.uploadfield == True: tmp = field.uploadfield = '%s_blob' % field.name fields.append(self._db.Field(tmp, 'blob', default='')) for field in fields: self.fields.append(field.name) self[field.name] = field field._tablename = self._tablename field._table = self field._db = self._db if field.requires == '<default>': field.requires = gluon.sql.sqlhtml_validators(field) self.ALL = SQLALL(self) def _create(self,polymodel=None): fields = [] myfields = {} for k in self.fields: if isinstance(polymodel,Table) and k in polymodel.fields(): continue field = self[k] attr = {} if isinstance(field.type, gluon.sql.SQLCustomType): ftype = self._db._translator[field.type.native or field.type.type](**attr) elif isinstance(field.type, gae.Property): ftype = field.type elif field.type.startswith('id'): continue elif field.type.startswith('reference'): if field.notnull: attr = dict(required=True) referenced = field.type[10:].strip() ftype = self._db._translator[field.type[:9]](self._db[referenced]) elif field.type.startswith('list:reference'): if field.notnull: attr = dict(required=True) referenced = field.type[15:].strip() ftype = self._db._translator[field.type[:14]](**attr) elif field.type.startswith('list:'): ftype = self._db._translator[field.type](**attr) elif not field.type in self._db._translator\ or not self._db._translator[field.type]: raise SyntaxError, 'Field: unknown field type: %s' % field.type else: ftype = self._db._translator[field.type](**attr) myfields[field.name] = ftype if not polymodel: self._tableobj = classobj(self._tablename, (gae.Model, ), myfields) elif polymodel==True: self._tableobj = classobj(self._tablename, (PolyModel, ), myfields) elif isinstance(polymodel,Table): self._tableobj = classobj(self._tablename, (polymodel._tableobj, ), myfields) else: raise RuntimeError, "polymodel must be None, True, a table or a tablename" return None def create(self): # nothing to do, here for backward compatibility pass def drop(self, mode = None): self.truncate(mode = mode) def truncate(self, mode = None): # nothing to do, here for backward compatibility self._db(self.id > 0).delete() def bulk_insert(self, *items): parsed_items = [] for item in items: fields = {} for field in self.fields: if not field in item and self[field].default != None: fields[field] = self[field].default elif not field in item and self[field].compute != None: fields[field] = self[field].compute(item) if field in item: fields[field] = obj_represent(item[field], self[field].type, self._db) #parsed_items.append(fields) parsed_items.append(self._tableobj(**fields)) gae.put(parsed_items) return True def insert(self, **fields): self._db['_lastsql'] = 'insert' for field in self.fields: if not field in fields and self[field].default != None: fields[field] = self[field].default elif not field in fields and self[field].compute != None: fields[field] = self[field].compute(fields) if field in fields: fields[field] = obj_represent(fields[field], self[field].type, self._db) tmp = self._tableobj(**fields) tmp.put() self['_last_reference'] = tmp rid = Reference(tmp.key().id()) (rid._table, rid._record) = (self, None) return rid class Expression(object): def __init__( self, name, type='string', db=None, ): (self.name, self.type, self._db) = (name, type, db) def __str__(self): return self.name def __or__(self, other): # for use in sortby assert_filter_fields(self, other) return Expression(self.name if self.type!='id' else '__key__' + '|' + other.name if other.type!='id' else '__key__', None, None) def __invert__(self): assert_filter_fields(self) return Expression('-' + self.name, self.type, None) # for use in Query def __eq__(self, value): return Query(self, '=', value) def __ne__(self, value): return Query(self, '!=', value) def __lt__(self, value): return Query(self, '<', value) def __le__(self, value): return Query(self, '<=', value) def __gt__(self, value): return Query(self, '>', value) def __ge__(self, value): return Query(self, '>=', value) def belongs(self, value): return Query(self, 'IN', value) def contains(self, *values): if self.type.startswith('list:'): return Query(self, 'IN', values) else: raise RuntimeError, "Not supported" # def like(self, value): return Query(self, ' LIKE ', value) # for use in both Query and sortby def __add__(self, other): return Expression('%s+%s' % (self, other), 'float', None) def __sub__(self, other): return Expression('%s-%s' % (self, other), 'float', None) def __mul__(self, other): return Expression('%s*%s' % (self, other), 'float', None) def __div__(self, other): return Expression('%s/%s' % (self, other), 'float', None) class Field(Expression, gluon.sql.Field): """ an instance of this class represents a database field example:: a=Field(name, 'string', length=32, required=False, default=None, requires=IS_NOT_EMPTY(), notnull=False, unique=False, uploadfield=True, widget=None, label=None, comment=None, writable=True, readable=True, update=None, authorize=None, autodelete=False, represent=None, uploadfolder=None) to be used as argument of GQLDB.define_table allowed field types: string, boolean, integer, double, text, blob, date, time, datetime, upload, password strings must have a length or 512 by default. fields should have a default or they will be required in SQLFORMs the requires argument are used to validate the field input in SQLFORMs """ def __init__( self, fieldname, type='string', length=None, default=None, required=False, requires='<default>', ondelete='CASCADE', notnull=False, unique=False, uploadfield=True, widget=None, label=None, comment=None, writable=True, readable=True, update=None, authorize=None, autodelete=False, represent=None, uploadfolder=None, compute=None ): self.name = fieldname = cleanup(fieldname) if fieldname in dir(Table) or fieldname[0] == '_': raise SyntaxError, 'Field: invalid field name: %s' % fieldname if isinstance(type, Table): type = 'reference ' + type._tablename if length == None: length = 512 self.type = type # 'string', 'integer' self.length = length # the length of the string self.default = default # default value for field self.required = required # is this field required self.ondelete = ondelete.upper() # this is for reference fields only self.notnull = notnull self.unique = unique self.uploadfield = uploadfield self.uploadfolder = uploadfolder self.widget = widget self.label = label self.comment = comment self.writable = writable self.readable = readable self.update = update self.authorize = authorize self.autodelete = autodelete self.represent = represent self.compute = compute self.isattachment = True if self.label == None: self.label = ' '.join([x.capitalize() for x in fieldname.split('_')]) if requires is None: self.requires = [] else: self.requires = requires def __str__(self): try: return '%s.%s' % (self._tablename, self.name) except: return '<no table>.%s' % self.name GQLDB.Field = Field # ## needed in gluon/globals.py session.connect GQLDB.Table = Table # ## needed in gluon/globals.py session.connect def obj_represent(obj, fieldtype, db): if type(obj) in (types.LambdaType, types.FunctionType): obj = obj() if isinstance(obj, (Expression, Field)): raise SyntaxError, "non supported on GAE" if isinstance(fieldtype, gluon.sql.SQLCustomType): return fieldtype.encoder(obj) if isinstance(fieldtype, gae.Property): return obj if obj == '' and not fieldtype[:2] in ['st','te','pa','up']: return None if obj != None: if fieldtype == 'date': if not isinstance(obj, datetime.date): (y, m, d) = [int(x) for x in str(obj).strip().split('-')] obj = datetime.date(y, m, d) elif fieldtype == 'time': if not isinstance(obj, datetime.time): time_items = [int(x) for x in str(obj).strip().split(':')[:3]] if len(time_items) == 3: (h, mi, s) = time_items else: (h, mi, s) = time_items + [0] obj = datetime.time(h, mi, s) elif fieldtype == 'datetime': if not isinstance(obj, datetime.datetime): (y, m, d) = [int(x) for x in str(obj)[:10].strip().split('-')] time_items = [int(x) for x in str(obj)[11:].strip().split(':')[:3]] if len(time_items) == 3: (h, mi, s) = time_items else: (h, mi, s) = time_items + [0] obj = datetime.datetime(y, m, d, h, mi, s) elif fieldtype == 'integer': obj = long(obj) elif fieldtype == 'double': obj = float(obj) elif fieldtype.startswith('reference'): if isinstance(obj, (Row, Reference)): obj = obj['id'] obj = long(obj) elif fieldtype == 'blob': pass elif fieldtype == 'boolean': if obj and not str(obj)[0].upper() == 'F': obj = True else: obj = False elif fieldtype.startswith('list:string'): if obj!=None and not isinstance(obj,(list,tuple)): obj=[obj] return [str(x) for x in obj] elif fieldtype.startswith('list:'): if obj!=None and not isinstance(obj,(list,tuple)): obj=[obj] return [int(x) for x in obj] elif isinstance(obj, str): obj = obj.decode('utf8') elif not isinstance(obj, unicode): obj = unicode(obj) return obj class Filter: def __init__(self,left,op,right): (self.left, self.op, self.right) = (left, op, right) def one(self): return self.left.type == 'id' and self.op == '=' def all(self): return self.left.type == 'id' and self.op == '>' and self.right == 0 def __str__(self): return '%s %s %s' % (self.left.name, self.op, self.right) class Query(object): """ A query object necessary to define a set. It can be stored or can be passed to GQLDB.__call__() to obtain a Set Example: query=db.users.name=='Max' set=db(query) records=set.select() """ def __init__( self, left, op=None, right=None, ): self.get_all = self.get_one = None if isinstance(left, list): self.filters = left return if isinstance(right, (Field, Expression)): raise SyntaxError, \ 'Query: right side of filter must be a value or entity: %s' \ % right if isinstance(left, Field): # normal filter: field op value assert_filter_fields(left) if left.type == 'id': try: if type(right) == list: #make this work for belongs right = [long(r) for r in right] else: right = long(right or 0) except ValueError: raise SyntaxError, 'id value must be integer: %s' % id if op != '=' and not (op == '>' and right == 0): #get key (or keys) based on path. Note if we later support # ancesters this will not be the proper key for items with # ancesters. #in GAE (with no ancesters) the key is base64 encoded # "table_name: id=<id>". GAE decodes the string and compares # the id if op=='IN': right = [Key.from_path(left._tablename, r) for r in right] else: right = Key.from_path(left._tablename, right) elif op=='IN': right = [dateobj_to_datetime(obj_represent(r, left.type, left._db)) \ for r in right] else: # filter dates/times need to be datetimes for GAE right = dateobj_to_datetime(obj_represent(right, left.type, left._db)) self.filters = [Filter(left, op, right)] return raise SyntaxError, 'not supported' def __and__(self, other): # concatenate list of filters # make sure all and one appear at the beginning if other.filters[0].one(): return Query(other.filters+self.filters) return Query(self.filters + other.filters) def __or__(self): raise RuntimeError, 'OR is not supported on GAE' def __invert__(self): if len(self.filters)!=1: raise RuntimeError, 'NOT (... AND ...) is not supported on GAE' filter = self.filters[0] if filter.op == 'IN': raise RuntimeError, 'NOT (... IN ...) is not supported on GAE' new_op = {'<':'>','>':'<','=':'!=','!=':'=','<=':'>=','>=':'<='}[filter.op] return Query(filter.left, new_op, filter.right) def __str__(self): return ' AND '.join([str(filter) for filter in self.filters]) class Set(gluon.sql.Set): """ As Set represents a set of records in the database, the records are identified by the where=Query(...) object. normally the Set is generated by GQLDB.__call__(Query(...)) given a set, for example set=db(db.users.name=='Max') you can: set.update(db.users.name='Massimo') set.delete() # all elements in the set set.select(orderby=db.users.id, groupby=db.users.name, limitby=(0, 10)) and take subsets: subset=set(db.users.id<5) """ def __init__(self, db, where=None): self._db = db if where: self.where = where self._tables = [filter.left._tablename for filter in where.filters] else: self._tables = [] self.where = None def __call__(self, where): if self.where: return Set(self._db, self.where & where) else: return Set(self._db, where) def _get_table_or_raise(self): tablenames = list(set(self._tables)) # unique if len(tablenames) < 1: raise SyntaxError, 'Set: no tables selected' if len(tablenames) > 1: raise SyntaxError, 'Set: no join in appengine' return self._db[tablenames[0]]._tableobj def _select(self, *fields, **attributes): valid_attributes = [ 'orderby', 'groupby', 'limitby', 'required', 'default', 'requires', 'left', 'cache', ] if [key for key in attributes.keys() if not key in valid_attributes]: raise SyntaxError, 'invalid select attribute: %s' % key if fields and isinstance(fields[0], SQLALL): self._tables.insert(0, fields[0].table._tablename) table = self._get_table_or_raise() tablename = table.kind() items = gae.Query(table) if not self.where: self.where = Query(fields[0].table.id,'>',0) for filter in self.where.filters: if filter.all(): #this is id > 0 continue elif filter.one() and filter.right<=0: #this is id == 0 items = [] elif filter.one(): #this is id == x item = self._db[tablename]._tableobj.get_by_id(filter.right) items = (item and [item]) or [] elif isinstance(items,list): (name, op, value) = \ (filter.left.name if filter.left.type!='id' else '__key__', filter.op, filter.right) if op == '=': op = '==' if op == 'IN': op = 'in' items = [item for item in items \ if eval("getattr(item,'%s') %s %s" % (name, op, repr(value)))] else: (name, op, value) = \ (filter.left.name if filter.left.type!='id' else '__key__', filter.op, filter.right) if filter.left.type=='id': items.order("__key__") items = items.filter('%s %s' % (name, op), value) if not isinstance(items,list): if attributes.get('left', None): raise SyntaxError, 'Set: no left join in appengine' if attributes.get('groupby', None): raise SyntaxError, 'Set: no groupby in appengine' orderby = attributes.get('orderby', False) if orderby: if isinstance(orderby, (list, tuple)): orderby = gluon.sql.xorify(orderby) assert_filter_fields(orderby) if orderby.type == 'id': orders = ['__key__'] else: orders = orderby.name.split('|') for order in orders: items = items.order(order) if attributes.get('limitby', None): (lmin, lmax) = attributes['limitby'] (limit, offset) = (lmax - lmin, lmin) items = items.fetch(limit, offset=offset) fields = self._db[tablename].fields return (items, tablename, fields) def select(self, *fields, **attributes): """ Always returns a Rows object, even if it may be empty cache attribute ignored on GAE """ (items, tablename, fields) = self._select(*fields, **attributes) self._db['_lastsql'] = 'SELECT WHERE %s' % self.where rows = [] for item in items: new_item = [] for t in fields: if t == 'id': new_item.append(int(item.key().id())) else: new_item.append(getattr(item, t)) rows.append(new_item) colnames = ['%s.%s' % (tablename, t) for t in fields] return self.parse(self._db, rows, colnames, False, SetClass=Set) @staticmethod def items_count(items): try: return len(items) except TypeError: return items.count() def count(self): (items, tablename, fields) = self._select() self._db['_lastsql'] = 'COUNT WHERE %s' % self.where return self.items_count(items) def delete(self): """ This function was changed on 2010-05-04 because according to http://code.google.com/p/googleappengine/issues/detail?id=3119 GAE no longer support deleting more than 1000 records. """ self._db['_lastsql'] = 'DELETE WHERE %s' % self.where (items, tablename, fields) = self._select() tableobj = self._db[tablename]._tableobj counter = self.items_count(items) # items can be one item or a query if not isinstance(items,list): leftitems = items.fetch(1000) while len(leftitems): gae.delete(leftitems) leftitems = items.fetch(1000) else: gae.delete(items) return counter def update(self, **update_fields): self._db['_lastsql'] = 'UPDATE WHERE %s' % self.where db = self._db (items, tablename, fields) = self._select() table = db[tablename] update_fields.update(dict([(fieldname, table[fieldname].update) \ for fieldname in table.fields \ if not fieldname in update_fields \ and table[fieldname].update != None])) update_fields.update(dict([(fieldname, table[fieldname].compute(update_fields)) \ for fieldname in table.fields \ if not fieldname in update_fields \ and table[fieldname].compute != None])) tableobj = table._tableobj counter = 0 for item in items: for (field, value) in update_fields.items(): value = obj_represent(update_fields[field], table[field].type, db) setattr(item, field, value) item.put() counter += 1 return counter def test_all(): """ How to run from web2py dir: eg. OSX: export PYTHONPATH=.:/usr/local/google_appengine python gluon/contrib/gql.py no output means all tests passed Setup the UTC timezone and database stubs >>> import os >>> os.environ['TZ'] = 'UTC' >>> # dev_server sets APPLICATION_ID, but we are not using dev_server, so manually set it to something >>> os.environ['APPLICATION_ID'] = 'test' >>> import time >>> if hasattr(time, 'tzset'): ... time.tzset() >>> >>> from google.appengine.api import apiproxy_stub_map >>> from google.appengine.api import datastore_file_stub >>> apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap() >>> apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', datastore_file_stub.DatastoreFileStub('doctests_your_app_id', '/dev/null', '/dev/null')) Create a table with all possible field types >>> db=GQLDB() >>> tmp=db.define_table('users', Field('stringf', 'string',length=32,required=True), Field('booleanf','boolean',default=False), Field('passwordf','password',notnull=True), Field('blobf','blob'), Field('uploadf','upload'), Field('integerf','integer',unique=True), Field('doublef','double',unique=True,notnull=True), Field('datef','date',default=datetime.date.today()), Field('timef','time'), Field('datetimef','datetime'), migrate='test_user.table') Insert a field >>> db.users.insert(stringf='a',booleanf=True,passwordf='p',blobf='0A', uploadf=None, integerf=5,doublef=3.14, datef=datetime.date(2001,1,1), timef=datetime.time(12,30,15), datetimef=datetime.datetime(2002,2,2,12,30,15)) 1 Select all # >>> all = db().select(db.users.ALL) Drop the table >>> db.users.drop() Select many entities >>> tmp = db.define_table(\"posts\", Field('body','text'), Field('total','integer'), Field('created_at','datetime')) >>> many = 20 #2010 # more than 1000 single fetch limit (it can be slow) >>> few = 5 >>> most = many - few >>> 0 < few < most < many True >>> for i in range(many): ... f=db.posts.insert(body='', total=i,created_at=datetime.datetime(2008, 7, 6, 14, 15, 42, i)) >>> >>> len(db().select(db.posts.ALL)) == many True >>> len(db().select(db.posts.ALL,limitby=(0,most))) == most True >>> len(db().select(db.posts.ALL,limitby=(few,most))) == most - few True >>> order = ~db.posts.total|db.posts.created_at >>> results = db().select(db.posts.ALL,limitby=(most,most+few),orderby=order) >>> len(results) == few True >>> results[0].total == few - 1 True >>> results = db().select(db.posts.ALL,orderby=~db.posts.created_at) >>> results[0].created_at > results[1].created_at True >>> results = db().select(db.posts.ALL,orderby=db.posts.created_at) >>> results[0].created_at < results[1].created_at True >>> db(db.posts.total==few).count() 1 >>> db(db.posts.id==2*many).count() 0 >>> db(db.posts.id==few).count() 1 >>> db(db.posts.id==str(few)).count() 1 >>> len(db(db.posts.id>0).select()) == many True >>> db(db.posts.id>0).count() == many True >>> set=db(db.posts.total>=few) >>> len(set.select())==most True >>> len(set(db.posts.total<=few).select()) 1 # test timezones >>> class TZOffset(datetime.tzinfo): ... def __init__(self,offset=0): ... self.offset = offset ... def utcoffset(self, dt): return datetime.timedelta(hours=self.offset) ... def dst(self, dt): return datetime.timedelta(0) ... def tzname(self, dt): return 'UTC' + str(self.offset) ... >>> SERVER_OFFSET = -8 >>> >>> stamp = datetime.datetime(2008, 7, 6, 14, 15, 42, 828201) >>> post_id = db.posts.insert(created_at=stamp) >>> naive_stamp = db(db.posts.id==post_id).select()[0].created_at >>> utc_stamp=naive_stamp.replace(tzinfo=TZOffset()) >>> server_stamp = utc_stamp.astimezone(TZOffset(SERVER_OFFSET)) >>> stamp == naive_stamp True >>> utc_stamp == server_stamp True >>> db(db.posts.id>0).count() == many + 1 True >>> db(db.posts.id==post_id).delete() 1 >>> db(db.posts.id>0).count() == many True >>> id = db.posts.insert(total='0') # coerce str to integer >>> db(db.posts.id==id).delete() 1 >>> db(db.posts.id > 0).count() == many True >>> set=db(db.posts.id>0) >>> set.update(total=0) # update entire set 20 >>> db(db.posts.total == 0).count() == many True >>> db.posts.drop() >>> db(db.posts.id>0).count() 0 Examples of insert, select, update, delete >>> tmp=db.define_table('person', Field('name'), Field('birth','date'), migrate='test_person.table') >>> person_id=db.person.insert(name=\"Marco\",birth='2005-06-22') >>> person_id=db.person.insert(name=\"Massimo\",birth='1971-12-21') >>> len(db().select(db.person.ALL)) 2 >>> me=db(db.person.id==person_id).select()[0] # test select >>> me.name 'Massimo' >>> db(db.person.name=='Massimo').update(name='massimo') # test update 1 >>> me = db(db.person.id==person_id).select()[0] >>> me.name 'massimo' >>> str(me.birth) '1971-12-21' # resave date to ensure it comes back the same >>> me=db(db.person.name=='Massimo').update(birth=me.birth) # test update >>> me = db(db.person.id==person_id).select()[0] >>> me.birth datetime.date(1971, 12, 21) >>> db(db.person.name=='Marco').delete() # test delete 1 >>> len(db().select(db.person.ALL)) 1 Update a single record >>> me.update_record(name=\"Max\") >>> me.name 'Max' Examples of complex search conditions >>> len(db((db.person.name=='Max')&(db.person.birth<'2003-01-01')).select()) 1 >>> len(db((db.person.name=='Max')&(db.person.birth<datetime.date(2003,01,01))).select()) 1 # >>> len(db((db.person.name=='Max')|(db.person.birth<'2003-01-01')).select()) # 1 >>> me=db(db.person.id==person_id).select(db.person.name)[0] >>> me.name 'Max' Examples of search conditions using extract from date/datetime/time # >>> len(db(db.person.birth.month()==12).select()) # 1 # >>> len(db(db.person.birth.year()>1900).select()) # 1 Example of usage of NULL >>> len(db(db.person.birth==None).select()) ### test NULL 0 # filter api does not support != yet # >>> len(db(db.person.birth!=None).select()) ### test NULL # 1 Examples of search consitions using lower, upper, and like # >>> len(db(db.person.name.upper()=='MAX').select()) # 1 # >>> len(db(db.person.name.like('%ax')).select()) # 1 # >>> len(db(db.person.name.upper().like('%AX')).select()) # 1 # >>> len(db(~db.person.name.upper().like('%AX')).select()) # 0 orderby, groupby and limitby >>> people=db().select(db.person.ALL,orderby=db.person.name) >>> order=db.person.name|~db.person.birth >>> people=db().select(db.person.ALL,orderby=order) # no groupby in appengine # >>> people=db().select(db.person.ALL,orderby=db.person.name,groupby=db.person.name) >>> people=db().select(db.person.ALL,orderby=order,limitby=(0,100)) Example of one 2 many relation >>> tmp=db.define_table('dog', Field('name'), Field('birth','date'), Field('owner',db.person), migrate='test_dog.table') >>> dog_id=db.dog.insert(name='Snoopy',birth=None,owner=person_id) A simple JOIN >>> len(db(db.dog.owner==person_id).select()) 1 >>> len(db(db.dog.owner==me.id).select()) 1 # test a table relation >>> dog = db(db.dog.id==dog_id).select()[0] >>> me = db(db.person.id==dog.owner).select()[0] >>> me.dog.select()[0].name 'Snoopy' Drop tables >>> db.dog.drop() >>> db.person.drop() Example of many 2 many relation and Set >>> tmp=db.define_table('author',Field('name'), migrate='test_author.table') >>> tmp=db.define_table('paper',Field('title'), migrate='test_paper.table') >>> tmp=db.define_table('authorship', Field('author_id',db.author), Field('paper_id',db.paper), migrate='test_authorship.table') >>> aid=db.author.insert(name='Massimo') >>> pid=db.paper.insert(title='QCD') >>> tmp=db.authorship.insert(author_id=aid,paper_id=pid) Define a Set >>> authorships=db(db.authorship.author_id==aid).select() >>> for authorship in authorships: ... papers=db(db.paper.id==authorship.paper_id).select() ... for paper in papers: print paper.title QCD Example of search condition using belongs # >>> set=(1,2,3) # >>> rows=db(db.paper.id.belongs(set)).select(db.paper.ALL) # >>> print rows[0].title # QCD Example of search condition using nested select # >>> nested_select=db()._select(db.authorship.paper_id) # >>> rows=db(db.paper.id.belongs(nested_select)).select(db.paper.ALL) # >>> print rows[0].title # QCD Output in csv # >>> str(authored_papers.select(db.author.name,db.paper.title)) # 'author.name,paper.title\r Massimo,QCD\r ' Delete all leftover tables # >>> GQLDB.distributed_transaction_commit(db) >>> db.authorship.drop() >>> db.author.drop() >>> db.paper.drop() # self reference >>> tmp = db.define_table('employees', ... Field('name'), ... Field('email'), ... Field('phone'), ... Field('foto','upload'), ... Field('manager','reference employees') ... ) >>> id1=db.employees.insert(name='Barack') >>> id2=db.employees.insert(name='Hillary',manager=id1) >>> barack = db.employees[id1] >>> hillary = db.employees[id2] >>> hillary.manager == barack.id True """ SQLField = Field SQLTable = Table SQLXorable = Expression SQLQuery = Query SQLSet = Set SQLRows = Rows SQLStorage = Row if __name__ == '__main__': import doctest doctest.testmod()

