Here is my code for Plug 'n Play detection of serial devices. It's not
yet ready for showtime, but it works with my serial attached Wacom.
Best regards,
eulores
# -*- coding: utf-8 -*-
import sys
if sys.platform == 'win32':
from twisted.internet import win32eventreactor
win32eventreactor.install()
from twisted.internet import reactor, protocol
from twisted.internet.task import deferLater, LoopingCall
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.serialport import *
import re
def wait(seconds):
return deferLater(reactor, seconds, lambda:None)
class StructuredPnP(object):
''' The structured PnP field is available with these fields:
data: string, required. Original unformatted string.
other: string, optional
rev: integer, required. Revision of the PnP standard.
100 is standard version 1.00
eisa: string, required
product: string, required
serial: string, optional
klass: string, optional
compat: string, optional. Other older products with
similar functionality
user: string, optional. Free flowing field useful for
the end-user
'''
def __init__(self, data):
#data = '\\96,N,8,1(\x01$WAC0608\\\\PEN\\WAC0000\\WACOM
UD\r\nUD-0608-R,V1.4-4\r\nF4)'
# test string for a 7-bit character string
#data = 'aaa(bbcccdddd\\eeeeeeee\\fff\\gggg\\hhhhii)'
# test string for a 6-bit character string
#data = 'AAA\x08BBCCCDDDD<EEEEEEEE<FFF<GGGG<HHHHII\x09'
self.data = data
for key in "other eisa product serial klass compat user".split():
setattr(self, key, '')
self.rev = '\0\0'
prologue = r'(?P<other>[^(]{,16}?)'
matrix =
r'(?P<rev>..)(?P<eisa>...)(?P<product>....)(?:@(?P<serial>[...@]{,8}))?(?:@(?P<klass>[...@]{,32}))?(?:@(?P<compat>[...@]{,40}))?(?:@(?P<user>.{,40}?))?(?:..)?'
needle1 = prologue + '\\(' + matrix.replace('@','\\\\') + '\\)'
needle2 = prologue + '\\\x08' + matrix.replace('@','\\\x3C') + '\\\x09'
dct = dict()
mo = re.match(needle1, data, re.S)
if mo:
dct = mo.groupdict()
else:
mo = re.match(needle2, data, re.S)
if mo:
dct = mo.groupdict()
for k in "eisa product serial klass compat user".split():
v = dct[k]
dct[k] = ''.join([chr(ord(ch)+0x20) for ch in list(v)])
for k,v in dct.items():
setattr(self, k, v)
self.rev = ((ord(self.rev[0])&0x3f)<<6) + (ord(self.rev[1])&0x3f)
def __str__(self):
return self.data
def __repr__(self):
l = ['<StructuredPnP object> %r' % self.data]
for k in "other rev eisa product serial klass compat user".split():
l.append(' %-8s %r' % (k, getattr(self,k,False)))
return '\n'.join(l)
class PnPProtocol(protocol.Protocol):
""" See Plug and Play External COM device Specification, rev 1.00
from Microsoft & Hayes, 1994-1995"""
def __init__(self, deferred):
self.deferred = deferred
self.data = ''
self.timeout = 1.4
self.T5 = reactor.callLater(self.timeout, self.deliverPnP)
def deliverPnP(self):
self.transport.loseConnection()
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.callback(StructuredPnP(self.data))
def dataReceived(self, data):
self.T5.reset(self.timeout)
self.data += data
if len(self.data)>=256:
self.T5.reset(0)
@inlineCallbacks
def connectionMade(self):
while 1:
# print "2.1.1"
self.transport.setDTR(1)
self.transport.setRTS(0)
yield wait(0.2)
if not self.transport.getDSR():
break
# print "2.1.3 part A"
self.transport.setDTR(0)
yield wait(0.2)
# print "2.1.3 part B"
self.transport.setDTR(1)
yield wait(0.2)
# print "2.1.4"
self.transport.setRTS(1)
# timer T5 is now used for per-character timeout
self.timeout = 0.2
yield wait(0.2)
if self.data:
break
# print "2.1.5"
self.transport.setDTR(0)
self.transport.setRTS(0)
yield wait(0.2)
# print "2.1.6"
self.transport.setDTR(1)
self.transport.setRTS(1)
yield wait(0.2)
break
if not self.data:
self.T5.reset(0)
returnValue(None)
def connectionLost(self, reason='connectionDone'):
print "Connection lost:", reason
def pnpString(port=0):
d = Deferred()
protocol = PnPProtocol(d)
try:
#SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
timeout=0, xonxoff=0, rtscts=0)
SerialPort(protocol, port, reactor, baudrate=1200,
bytesize=SEVENBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE,
xonxoff=0, rtscts=0)
except serial.SerialException:
d.callback(StructuredPnP(''))
return d
@inlineCallbacks
def imRunning():
print "PnP string: %r" % (yield pnpString(3))
reactor.stop()
if __name__ == "__main__":
reactor.callWhenRunning(imRunning)
reactor.run()
On Tue, Sep 14, 2010 at 11:24 AM, Markus Hubig <[email protected]> wrote:
> Hi @all!
> I'm trying to write a python library module for a special
> serial communication protocol called IMPBUS. To use the serial
> interface for sending and receiving packets as for now I'm
> sub-classing pyserial. My code looks like this:
>
> But the problem is that I can't use select with pyserial on Windows,
> because it don't provide the fileno() methode. So after some googling
> I found twisted.internet.serialport "A select()able serial device, acting
> as a transport."
> I never used twisted before so I'm a little overwhelmed by how I can
> replace pyserial with twisted in the code above ... maybe someone can
> point me to the right direction. It seems I need a "Protocol" and a
> "receiver" ...
> - Markus
> --
_______________________________________________
Twisted-Python mailing list
[email protected]
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python