----- Original Message ----- From: "David Goncalves" <[EMAIL PROTECTED]>
To: <pythonce@python.org>
Sent: Wednesday, January 03, 2007 3:09 AM
Subject: [PythonCE] Accessing serial port with PythonCE 2.5


Hi,

Everything is in the subject ;)

Is there any way to access the serial port with that build of PythonCE ?
I've not found any 'serial' module to import.

Regards.

Well there is no "serial" module included with PythonCE, but Python on my PC doesn't have such a module either. I haven't done it myself on PythonCE but I think the best way would be to use ctypes to access the Win32 serial communications APIs. I have written some code to do this on the PC, so I have attached it.

This code is not intended to be particularly efficient because I only used it to write a serial protocol simulator, but hopefully it is fairly simple to use. Again, I haven't tried it on Windows CE but I think it should work with minor changes. For example, I can see that SerialPort.__init__() would have to be modified slightly to open e.g. "COM1:" instead of "\\.\COM1".

Luke

import ctypes
import threading
import time
import Queue

#################################################
#
# NOTE: I am using unicode APIs in case this code is used on WinCE in future

BYTE = ctypes.c_ubyte
CHAR = ctypes.c_char
LPVOID = ctypes.c_void_p
LPCWSTR = ctypes.c_wchar_p
WORD = ctypes.c_ushort
DWORD = ctypes.c_uint
HANDLE = DWORD

INVALID_HANDLE_VALUE = -1 #0xffffffffL

GENERIC_READ = 0x80000000L
GENERIC_WRITE = 0x40000000L
GENERIC_EXECUTE = 0x20000000L
GENERIC_ALL = 0x10000000L

CREATE_NEW = 1
CREATE_ALWAYS = 2
OPEN_EXISTING = 3
OPEN_ALWAYS = 4
TRUNCATE_EXISTING = 5

def CheckHANDLE(value):
   if value == INVALID_HANDLE_VALUE:
       raise ctypes.WinError()
   return value

def CheckBOOL(value):
   if not value:
       raise ctypes.WinError()
   return value

class DCB(ctypes.Structure):
   FLAG_BINARY = 0x1
   FLAG_PARITY = 0x2
   FLAG_OUTXCTSFLOW = 0x4
   FLAG_OUTXDSRFLOW = 0x8
   MASK_DTRCONTROL = 0x30
   DTR_CONTROL_ENABLE = 0x10
   FLAG_DSRSENSITIVITY = 0x40
   FLAG_TXCONTINUEONXOFF = 0x80
   FLAG_OUTX = 0x100
   FLAG_INX = 0x200
   FLAG_ERROR_CHAR = 0x400
   FLAG_NULL = 0x800
   MASK_RTS_CONTROL = 0x3000
   RTS_CONTROL_ENABLE = 0x1000

   NOPARITY = 0
   ODDPARITY = 1
   EVENPARITY = 2
   MARKPARITY = 3
   SPACEPARITY = 4

   ONESTOPBIT = 0
   ONE5STOPBITS = 1
   TWOSTOPBITS = 2

   _fields_ = [
       ('DCBlength', DWORD),
       ('BaudRate', DWORD),
       ('flags', DWORD),
       ('wReserved', WORD),
       ('XonLim', WORD),
       ('XoffLim', WORD),
       ('ByteSize', BYTE),
       ('Parity', BYTE),
       ('StopBits', BYTE), # 0=1, 1=1.5, 2=2
       ('XonChar', CHAR),
       ('XoffChar', CHAR),
       ('ErrorChar', CHAR),
       ('EofChar', CHAR),
       ('EvtChar', CHAR),
       ('wReserved1', WORD),
       ]

   def __init__(self):
       ctypes.Structure.__init__(self)
       self.DCBlength = ctypes.sizeof(self)

MAXDWORD = 0xffffffffL

class COMMTIMEOUTS(ctypes.Structure):
   _fields_ = [
       ('ReadIntervalTimeout', DWORD),
       ('ReadTotalTimeoutMultiplier', DWORD),
       ('ReadTotalTimeoutConstant', DWORD),
       ('WriteTotalTimeoutMultiplier', DWORD),
       ('WriteTotalTimeoutConstant', DWORD),
       ]

SETXOFF = 1
SETXON = 2
SETRTS = 3
CLRRTS = 4
SETDTR = 5
CLRDTR = 6
SETBREAK = 8
CLRBREAK = 9

kernel32 = ctypes.windll.kernel32

CreateFileW = kernel32.CreateFileW
# TODO: support SECURITY_ATTRIBUTES
CreateFileW.argtypes = [ LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE ]
CreateFileW.restype = CheckHANDLE

CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [ HANDLE ]
CloseHandle.restype = CheckBOOL

ReadFile = kernel32.ReadFile
# TODO: support OVERLAPPED
ReadFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ]
ReadFile.restype = CheckBOOL

WriteFile = kernel32.WriteFile
# TODO: support OVERLAPPED
WriteFile.argtypes = [ HANDLE, LPVOID, DWORD, ctypes.POINTER(DWORD), LPVOID ]
WriteFile.restype = CheckBOOL

GetCommState = kernel32.GetCommState
GetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ]
GetCommState.restype = CheckBOOL

SetCommState = kernel32.SetCommState
SetCommState.argtypes = [ HANDLE, ctypes.POINTER(DCB) ]
SetCommState.restype = CheckBOOL

GetCommTimeouts = kernel32.GetCommTimeouts
GetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ]
GetCommTimeouts.restype = CheckBOOL

SetCommTimeouts = kernel32.SetCommTimeouts
SetCommTimeouts.argtypes = [ HANDLE, ctypes.POINTER(COMMTIMEOUTS) ]
SetCommTimeouts.restype = CheckBOOL

SetupComm = kernel32.SetupComm
SetupComm.argtypes = [ HANDLE, DWORD, DWORD ]
SetupComm.restype = CheckBOOL

EscapeCommFunction = kernel32.EscapeCommFunction
EscapeCommFunction.argtypes = [ HANDLE, DWORD ]
EscapeCommFunction.restype = CheckBOOL

#################################################

class SerialPort(object):
   def __init__(self, name):
       if not name.startswith('\\\\.\\'):
           name = '\\\\.\\' + name
       self.name = name
self.handle = CreateFileW(name, GENERIC_READ | GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, 0)

   def close(self):
       if self.handle:
           CloseHandle(self.handle)
       self.handle = None

   def __del__(self):
       self.close()

   def read(self, n):
       dwRead = DWORD(0)
       buf = ctypes.create_string_buffer(n)
       ReadFile(self.handle, buf, n, ctypes.byref(dwRead), None)
       return buf.raw[:dwRead.value]

   def write(self, data):
       dwWritten = DWORD(0)
       WriteFile(self.handle, data, len(data), ctypes.byref(dwWritten), None)
       assert dwWritten.value == len(data)

   def Setup(self, baud=9600, bits=8, parity=None, non_blocking=False):
       cto = COMMTIMEOUTS()
       if non_blocking:
           cto.ReadIntervalTimeout = MAXDWORD
       cto.ReadTotalTimeoutConstant = 0
       cto.ReadTotalTimeoutMultiplier = 0
       cto.WriteTotalTimeoutMultiplier = 0
       #cto.WriteTotalTimeoutConstant = 500
       SetCommTimeouts(self.handle, ctypes.byref(cto))
       SetupComm(self.handle, 4096, 4096)
       dcb = DCB()
       GetCommState(self.handle, ctypes.byref(dcb))
       dcb.BaudRate = baud
       dcb.flags = DCB.FLAG_BINARY
       #dcb.flags = DCB.FLAG_BINARY | DCB.DTR_CONTROL_ENABLE | 
DCB.RTS_CONTROL_ENABLE | DCB.FLAG_TXCONTINUEONXOFF
       dcb.ByteSize = bits
       if parity is None:
           parity = DCB.NOPARITY
       dcb.Parity = parity
       dcb.StopBits = DCB.ONESTOPBIT
       dcb.XonLim = 0
       dcb.XoffLim = 0
       SetCommState(self.handle, ctypes.byref(dcb))
       self.SetDTR()

   def SetDTR(self):
       EscapeCommFunction(self.handle, SETDTR)

class SerialPortQueue(object):
   def __init__(self, port):
       self.port = port
       self.lock = threading.RLock()
       self.receive_thread = threading.Thread(target=self.ReceiveThreadProc)
       self.send_thread = threading.Thread(target=self.SendThreadProc)
       self.receive_queue = Queue.Queue(0)
       self.send_queue = Queue.Queue(0)

   def ReceiveThreadProc(self):
       while not self.stopping:
           self.lock.acquire()
           data = ''
           try:
               data = self.port.read(16 * 1024)
           finally:
               self.lock.release()
           if len(data) > 0:
               self.receive_queue.put(data)
           time.sleep(0.030)

   def SendThreadProc(self):
       while not self.stopping:
           try:
               data = self.send_queue.get_nowait()
           except Queue.Empty:
               time.sleep(0.030)
           else:
               self.lock.acquire()
               try:
                   self.port.write(data)
               finally:
                   self.lock.release()

   def Start(self):
       self.stopping = False
       self.receive_thread.start()
       self.send_thread.start()

   def Stop(self):
       self.stopping = True
       self.receive_thread.join()
       self.send_thread.join()

   def Send(self, data):
       self.send_queue.put(data)

   def CheckReceive(self):
       while True:
           try:
               data = self.receive_queue.get_nowait()
           except Queue.Empty:
               break
           else:
               self.OnReceive(data)

   def OnReceive(self, data):
       pass



_______________________________________________
PythonCE mailing list
PythonCE@python.org
http://mail.python.org/mailman/listinfo/pythonce

Reply via email to