Awesome!  Thanks a bunch for posting this.  The GPL nature of the
sAsync tool had me a bit perplexed also.

On Feb 20, 12:43 pm, David Bolen <[EMAIL PROTECTED]> wrote:
> "Matt Culbreth" <[EMAIL PROTECTED]> writes:
> > I'm playing out with a few things now and I wanted to see if anyone
> > else has used SQLAlchemy in an asynchronous manner?  For example, you
> > could create a service which responded to asynchronous requests for
> > data, and could be used by a web client, desktop client, other types
> > of clients, etc.
>
> Yep, I'm currently building a Twisted application (client/server)
> where the server is using SQLAlchemy.
>
> > The sAsync project athttp://foss.eepatents.com/sAsync/seems ideally
> > suited for this but I haven't seen any comments about it here.
>
> For what it's worth, I opted not to go with sAsync (which looks very
> nice) because I was looking for something non-GPL.  In the end, I
> rolled something small of my own, since my requirements were simple.
>
> I created a simple database class that contained a background thread
> for execution.  The background thread owns the engine and session
> objects and serializes any use of them through a run() method and
> internal queue.  E.g., very simple single-thread pool.  The
> application object instances themselves are accessed normally in the
> main Twisted thread.
>
> When a Twisted routine wants to use SQLAlchemy operations it just
> calls run() or encapsulates them in a small function to operate on.  I
> originally toyed with wrapping session or engine objects to more
> dynamically or automatically adapt things, but wasn't comfortable with
> the result.  So in the end it's pretty explicit in the code, but then
> that's one of the attractive qualities to me of both Twisted and
> SQLAlchemy - not too much "magic" going on implicitly.
>
> The idea was that the overall objects involved are common at the
> Twisted thread level so you're application is already set for a single
> threaded (asynchronous) environment.  This just defers the actual
> database operations (via SA) into the background thread, but doesn't
> attempt to simulate multiple threads of object state (such as having a
> distinct session per thread in a multi-threaded application might do).
>
> In case you're interested, it's tiny, so here's my current database
> class (still needs some final error checking polish, but I'm using it
> in development):
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
> import threading
> import Queue
>
> import sqlalchemy as sa
> import sqlalchemy.orm as orm
>
> from twisted.internet import reactor
> from twisted.internet.defer import Deferred
> from twisted.python.failure import Failure
>
> #
> # --------------------------------------------------------------------------
> #
>
> class Database(object):
>     """Wrapper for a SQLAlchemy engine supporting execution of database
>     related activities in a background thread with the result communicated
>     back through a deferred.
>
>     If a metadata object is provided to the constructor, it is connected
>     to the underlying engine within the background thread.  (And is,
>     presumably a DynamicMetaData object)"""
>
>     def __init__(self, db_url, metadata=None, *engine_args, **engine_kwargs):
>         self.queue = Queue.Queue()
>         self.started = threading.Event()
>
>         self.dbthread = threading.Thread(target=self._dbThread,
>                                          args=(db_url, metadata,
>                                                engine_args, engine_kwargs))
>         self.dbthread.setDaemon(True)
>         self.dbthread.start()
>         self.started.wait()
>
>     #
>     # Background execution thread
>     #
>
>     def _dbThread(self, db_url, metadata, eargs, ekwargs):
>         self.engine = sa.create_engine(db_url, *eargs, **ekwargs)
>         self.session = orm.create_session(self.engine)
>         if metadata:
>             metadata.connect(self.engine)
>         self.started.set()
>
>         while 1:
>             op = self.queue.get()
>             if op is None:
>                 return
>             else:
>                 func, args, kwargs, d = op
>                 try:
>                     result = d.callback, func(*args, **kwargs)
>                 except:
>                     result = d.errback, Failure()
>                 reactor.callFromThread(*result)
>
>     #
>     # Primary thread entry points
>     #
>
>     def run(self, func, *args, **kwargs):
>         result = Deferred()
>         self.queue.put((func, args, kwargs, result))
>         return result
>
>     def shutdown(self):
>         self.queue.put(None)
>         self.dbthread.join(2)
>
>           - - - - - - - - - - - - - - - - - - - - - - - - -
>
> Typical initialization (startup before initial reactor.run()):
>
>     db = database.Database('sqlite:///%s' % db_filename,
>                            metadata=schema.metadata)
>
> And here's a simple retrieval obj objects through the ORM within a
> server routine (a PB published object):
>
>     def remote_jobs(self):
>         return self.db.run(lambda:self.db.session.query(model.Job).select())
>
> Here's an operation with a few more steps encapsulated into a function (it
> saves a copy of a deleted object into an archive table):
>
>     def remote_deleteJob(self, job_id):
>
>         def dbop(job_id=job_id):
>             s = self.db.session
>             job = s.query(model.Job).get(job_id)
>             s.delete(job)
>             # Create duplicate for archiving (Skipping _* attributes
>             # ignores all the SA state stuff)
>             archive = model.Job()
>             for attr, value in job.__dict__.items():
>                 if attr[0] != '_': setattr(archive, attr, value)
>             s.save(archive, entity_name='deleted')
>             s.flush()
>
>         d = self.db.run(dbop)
>         return d
>
> And if you've got Python 2.5, here's an inlineCallbacks method
> (slighted edited down):
>
>     @inlineCallbacks
>     def remote_uploadRequest(self, job_id, job_file):
>         job = yield self.db.run(lambda:
>                                 self.db.session(model.Job).get(job_id))
>         if not job:
>             raise Exception('Invalid job')
>
>         # Mark that it hasn't been uploaded yet
>         job_file.uploaded = None
>
>         def saveFile(job_file=job_file):
>             self.db.session.save(job_file)
>             self.db.session.flush()
>
>         # Insert it into the database
>         yield self.db.run(saveFile)
>
>         upload_key = self.fileio.setupUpload(job_file.name)
>
>         returnValue({'port': self.port, 'key': upload_key})
>
> -- David


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to [EMAIL PROTECTED]
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to