----- Original Message -----
From: "David Goncalves" <[EMAIL PROTECTED]>
To: <pythonce@python.org>
Sent: Wednesday, January 03, 2007 3:09 AM
Subject: [PythonCE] Accessing serial port with PythonCE 2.5
Hi,
Everything is in the subject ;)
Is there any way to access the serial port with that build of PythonCE ?
I've not found any 'serial' module to import.
Regards.
Well there is no "serial" module included with PythonCE, but Python on my PC
doesn't have such a module either. I haven't done it myself on PythonCE but
I think the best way would be to use ctypes to access the Win32 serial
communications APIs. I have written some code to do this on the PC, so I
have attached it.
This code is not intended to be particularly efficient because I only used
it to write a serial protocol simulator, but hopefully it is fairly simple
to use. Again, I haven't tried it on Windows CE but I think it should work
with minor changes. For example, I can see that SerialPort.__init__() would
have to be modified slightly to open e.g. "COM1:" instead of "\\.\COM1".
Luke
import ctypes
import threading
import time
import Queue
#################################################
#
# NOTE: I am using unicode APIs in case this code is used on WinCE in future
BYTE = ctypes.c_ubyte
CHAR = ctypes.c_char
LPVOID = ctypes.c_void_p
LPCWSTR = ctypes.c_wchar_p
WORD = ctypes.c_ushort
DWORD = ctypes.c_uint
HANDLE = DWORD
INVALID_HANDLE_VALUE = -1 #0xffffffffL
GENERIC_READ = 0x80000000L
GENERIC_WRITE = 0x40000000L
GENERIC_EXECUTE = 0x20000000L
GENERIC_ALL = 0x10000000L
CREATE_NEW = 1
CREATE_ALWAYS = 2
OPEN_EXISTING = 3
OPEN_ALWAYS = 4
TRUNCATE_EXISTING = 5
def CheckHANDLE(value):
if value == INVALID_HANDLE_VALUE:
raise ctypes.WinError()
return value
def CheckBOOL(value):
if not value:
raise ctypes.WinError()
return value
class DCB(ctypes.Structure):
FLAG_BINARY = 0x1
FLAG_PARITY = 0x2
FLAG_OUTXCTSFLOW = 0x4
FLAG_OUTXDSRFLOW = 0x8
MASK_DTRCONTROL = 0x30
DTR_CONTROL_ENABLE = 0x10
FLAG_DSRSENSITIVITY = 0x40
FLAG_TXCONTINUEONXOFF = 0x80
FLAG_OUTX = 0x100
FLAG_INX = 0x200
FLAG_ERROR_CHAR = 0x400
FLAG_NULL = 0x800
MASK_RTS_CONTROL = 0x3000
RTS_CONTROL_ENABLE = 0x1000
NOPARITY = 0
ODDPARITY = 1
EVENPARITY = 2
MARKPARITY = 3
SPACEPARITY = 4
ONESTOPBIT = 0
ONE5STOPBITS = 1
TWOSTOPBITS = 2
_fields_ = [
('DCBlength', DWORD),
('BaudRate', DWORD),
('flags', DWORD),
('wReserved', WORD),
('XonLim', WORD),
('XoffLim', WORD),
('ByteSize', BYTE),
('Parity', BYTE),
('StopBits', BYTE), # 0=1, 1=1.5, 2=2
('XonChar', CHAR),
('XoffChar', CHAR),
('ErrorChar', CHAR),
('EofChar', CHAR),
('EvtChar', CHAR),
('wReserved1', WORD),
]
def __init__(self):
ctypes.Structure.__init__(self)
self.DCBlength = ctypes.sizeof(self)
MAXDWORD = 0xffffffffL
class COMMTIMEOUTS(ctypes.Structure):
_fields_ = [
('ReadIntervalTimeout', DWORD),
('ReadTotalTimeoutMultiplier', DWORD),
('ReadTotalTimeoutConstant', DWORD),
('WriteTotalTimeoutMultiplier', DWORD),
('WriteTotalTimeoutConstant', DWORD),
]
SETXOFF = 1
SETXON = 2
SETRTS = 3
CLRRTS = 4
SETDTR = 5
CLRDTR = 6
SETBREAK = 8
CLRBREAK = 9
kernel32 = ctypes.windll.kernel32
CreateFileW = kernel32.CreateFileW
# TODO: support SECURITY_ATTRIBUTES
CreateFileW.argtypes = [ LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE ]
CreateFileW.restype = CheckHANDLE
CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [ HANDLE ]
CloseHandle.restype = CheckBOOL
ReadFile = kernel32.ReadFile
# TODO: support OVERLAPPED
ReadFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ]
ReadFile.restype = CheckBOOL
WriteFile = kernel32.WriteFile
# TODO: support OVERLAPPED
WriteFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ]
WriteFile.restype = CheckBOOL
GetCommState = kernel32.GetCommState
GetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ]
GetCommState.restype = CheckBOOL
SetCommState = kernel32.SetCommState
SetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ]
SetCommState.restype = CheckBOOL
GetCommTimeouts = kernel32.GetCommTimeouts
GetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ]
GetCommTimeouts.restype = CheckBOOL
SetCommTimeouts = kernel32.SetCommTimeouts
SetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ]
SetCommTimeouts.restype = CheckBOOL
SetupComm = kernel32.SetupComm
SetupComm.argtypes = [ HANDLE, DWORD, DWORD ]
SetupComm.restype = CheckBOOL
EscapeCommFunction = kernel32.EscapeCommFunction
EscapeCommFunction.argtypes = [ HANDLE, DWORD ]
EscapeCommFunction.restype = CheckBOOL
#################################################
class SerialPort(object):
def __init__(self, name):
if not name.startswith('\\\\.\\'):
name = '\\\\.\\' + name
self.name = name
self.handle = CreateFileW(name, GENERIC_READ | GENERIC_WRITE, 0, None,
OPEN_EXISTING, 0, 0)
def close(self):
if self.handle:
CloseHandle(self.handle)
self.handle = None
def __del__(self):
self.close()
def read(self, n):
dwRead = DWORD(0)
buf = ctypes.create_string_buffer(n)
ReadFile(self.handle, buf, n, ctypes.byref(dwRead), None)
return buf.raw[:dwRead.value]
def write(self, data):
dwWritten = DWORD(0)
WriteFile(self.handle, data, len(data), ctypes.byref(dwWritten), None)
assert dwWritten.value == len(data)
def Setup(self, baud=9600, bits=8, parity=None, non_blocking=False):
cto = COMMTIMEOUTS()
if non_blocking:
cto.ReadIntervalTimeout = MAXDWORD
cto.ReadTotalTimeoutConstant = 0
cto.ReadTotalTimeoutMultiplier = 0
cto.WriteTotalTimeoutMultiplier = 0
#cto.WriteTotalTimeoutConstant = 500
SetCommTimeouts(self.handle, ctypes.byref(cto))
SetupComm(self.handle, 4096, 4096)
dcb = DCB()
GetCommState(self.handle, ctypes.byref(dcb))
dcb.BaudRate = baud
dcb.flags = DCB.FLAG_BINARY
#dcb.flags = DCB.FLAG_BINARY | DCB.DTR_CONTROL_ENABLE |
DCB.RTS_CONTROL_ENABLE | DCB.FLAG_TXCONTINUEONXOFF
dcb.ByteSize = bits
if parity is None:
parity = DCB.NOPARITY
dcb.Parity = parity
dcb.StopBits = DCB.ONESTOPBIT
dcb.XonLim = 0
dcb.XoffLim = 0
SetCommState(self.handle, ctypes.byref(dcb))
self.SetDTR()
def SetDTR(self):
EscapeCommFunction(self.handle, SETDTR)
class SerialPortQueue(object):
def __init__(self, port):
self.port = port
self.lock = threading.RLock()
self.receive_thread = threading.Thread(target=self.ReceiveThreadProc)
self.send_thread = threading.Thread(target=self.SendThreadProc)
self.receive_queue = Queue.Queue(0)
self.send_queue = Queue.Queue(0)
def ReceiveThreadProc(self):
while not self.stopping:
self.lock.acquire()
data = ''
try:
data = self.port.read(16 * 1024)
finally:
self.lock.release()
if len(data) > 0:
self.receive_queue.put(data)
time.sleep(0.030)
def SendThreadProc(self):
while not self.stopping:
try:
data = self.send_queue.get_nowait()
except Queue.Empty:
time.sleep(0.030)
else:
self.lock.acquire()
try:
self.port.write(data)
finally:
self.lock.release()
def Start(self):
self.stopping = False
self.receive_thread.start()
self.send_thread.start()
def Stop(self):
self.stopping = True
self.receive_thread.join()
self.send_thread.join()
def Send(self, data):
self.send_queue.put(data)
def CheckReceive(self):
while True:
try:
data = self.receive_queue.get_nowait()
except Queue.Empty:
break
else:
self.OnReceive(data)
def OnReceive(self, data):
pass
_______________________________________________
PythonCE mailing list
PythonCE@python.org
http://mail.python.org/mailman/listinfo/pythonce