Update of /cvsroot/tmda/tmda/bin
In directory sc8-pr-cvs1:/tmp/cvs-serv6850/bin
Added Files:
tmda-manager
Log Message:
The Manager is a kind of a shell to control TMDA.
For the moment, it can only do what tmda-gui does, ie. managing pending
queue and generating/checking addresses.
It's meant to run as a shell, or as a daemon. The latter mode enables a
user to use tmda-gui from a remote location, with no need of the TMDA
modules.
tmda-manager uses a new module called Auth.py, based on the code from
tmda-ofmipd. At some point, we'll need to move the common code into
Auth.
This is an early alpha release, so be careful with it. Try it on test
users first.
tmda-manager syntax:
Authentication:
auth user pass
List pending msgids:
pending
Handle message:
message action msgid
where action is one of show, terse, delete, release, whitelist,
blacklist
and msgid is the message id as shown by the pending command.
Generate address:
address type option
where type is one of dated, sender, keyword
and option is the date, the sender address or the keyword.
Check addresses:
checkaddress address arg
where address is the address to check and arg is the sender address
if needed.
More on this later.
--- NEW FILE ---
#!/usr/bin/env python
#
# Copyright (C) 2001,2002 Jason R. Mastaler <[EMAIL PROTECTED]>
#
# This file is part of TMDA.
#
# TMDA is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version. A copy of this license should
# be included in the file COPYING.
#
# TMDA is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with TMDA; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# Based on code from Python's (undocumented) smtpd module
# Copyright (C) 2001,2002 Python Software Foundation.
"""An authenticated ofmip proxy for TMDA. Tag your outgoing mail through SMTP.
See <URL:http://tmda.net/tmda-ofmipd.html> for complete setup and
usage information.
Usage: %(program)s [OPTIONS]
OPTIONS:
-h
--help
Print this message and exit.
-V
--version
Print TMDA version information and exit.
-d
--daemon
Daemon mode.
-D
--debug
Turn on debugging prints.
-u <username>
--username <username>
The username that this program should run under. The default
is to run as the user who starts the program unless that is
root, in which case an attempt to seteuid user `tofmipd' will be
made. Use this option to override these defaults.
-H <host:port>
--hostport <host:port>
The host:port to listen for incoming connections on. The
default is FQDN:8025 (i.e, port 8025 on the fully qualified
domain name for the local host).
-R proto[://host[:port]]
--remoteauth proto[://host[:port]][/dn]
Host to connect to to check username and password.
- proto can be one of the following:
`imap' (IMAP4 server)
'imaps' (IMAP4 server over SSL)
`pop3' (POP3 server)
`apop' (POP3 server with APOP authentication)
`ldap' (LDAP server)
- host defaults to localhost
- port defaults to 143 (imap), 993 (imaps), 110 (pop3/apop), 389 (ldap)
- dn is mandatory for ldap and should contain a `%%s' identifying
the username
Examples: -R imaps://myimapserver.net
-R pop3://mypopserver.net:2110
-R ldap://host.com/cn=%%s,dc=host,dc=com
-A <program>
--authprog <program>
checkpassword compatible command used to check username/password. e.g,
`-A /usr/sbin/checkpassword-pam -s id --stdin -- /bin/true'
The program must be able to receive the username/password pair
on its stdin, and in the following format:
`username\\0password\\0'
-a <file>
--authfile <file>
Path to the file holding authentication information for this
proxy. Default location is /etc/tofmipd if running as
root/tofmipd, otherwise ~user/.tmda/tofmipd. Use this option
to override these defaults.
"""
import getopt
import os
import socket
import sys
try:
import paths
except ImportError:
# Prepend /usr/lib/python2.x/site-packages/TMDA/pythonlib
sitedir = os.path.join(sys.prefix, 'lib', 'python'+sys.version[:3],
'site-packages', 'TMDA', 'pythonlib')
sys.path.insert(0, sitedir)
from TMDA import Util
from TMDA import Auth
from TMDA import Version
# Some defaults
FQDN = socket.getfqdn()
defaultport = 8765
hostport = '%s:%s' % (FQDN, defaultport)
program = sys.argv[0]
daemon_mode = None
def usage(code, msg=''):
print __doc__ % globals()
if msg:
print msg
sys.exit(code)
try:
opts, args = getopt.getopt(sys.argv[1:],
'H:u:R:A:a:dDVh', ['hostport=',
'username=',
'authfile=',
'remoteauth=',
'authprog=',
'daemon',
'debug',
'version',
'help'])
except getopt.error, msg:
usage(1, msg)
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
if opt == '-V':
print Version.ALL
sys.exit()
if opt == '--version':
print Version.TMDA
sys.exit()
elif opt in ('-D', '--debug'):
Auth.DEBUGSTREAM = sys.stderr
elif opt in ('-d', '--daemon'):
daemon_mode = 1
elif opt in ('-H', '--hostport'):
hostport = arg
elif opt in ('-u', '--username'):
username = arg
elif opt in ('-R', '--remoteauth'):
Auth.parse_auth_uri(arg)
elif opt in ('-A', '--authprog'):
Auth.authprog = arg
elif opt in ('-a', '--authfile'):
Auth.authfile = arg
Auth.security_disclaimer()
Auth.init_auth_method()
import asynchat
import asyncore
import base64
import hmac
import md5
import popen2
import random
import time
__version__ = Version.TMDA
import readline
from cmd import Cmd
TMDAShellExit = ''
class TMDAShell(Cmd):
ProtoVer = '0.1'
def __init__(self, prompt = '+', stdin = sys.stdin, stdout = sys.stdout, stderr =
sys.stderr):
# self.use_raw_input = 1
sys.stdin = stdin
sys.stdout = stdout
sys.stderr = stderr
self.prompt = prompt
self.authenticated = 0
Cmd.__init__(self)
def Print(self, *strings):
self.stdout.write(' '.join(strings))
self.stdout.write('\n')
def write(self, string):
self.stdout.write(string)
def readline(self):
return self.stdin.readline()
def outputData(self, data):
if type(data) == str:
data = data.split('\n')
d = []
for line in data:
for subline in line.split('\n'):
d.append(subline)
sys.stdout.write(' ' + '\n '.join(d) + '\n')
def parseArgs(self, args):
pargs = []
for arg in args.split(' '):
if not arg:
continue
pargs.append(arg)
return pargs
def mainLoop(self):
try:
self.cmdloop('TMDA Manager v%s\nProtocole v%s' % (Version.TMDA,
self.ProtoVer))
except KeyboardInterrupt, msg:
raise KeyboardInterrupt, msg
def log(self, *strings):
for s in strings:
sys.stderr.write(str(s))
sys.stderr.write('\n')
def error(self, ret, code, str):
print "-%s: %s" % (code, str)
return ret
## Cmd functions
def help_auth(self):
print "auth userid password"
def do_nop(self, args):
self.outputData("%s" % args)
def do_auth(self, args):
if self.authenticated:
return self.error(0, "AUTH", "Already authenticated.")
args = self.parseArgs(args)
try:
self.userid = args[0]
self.password = args[1]
except IndexError:
self.userid = None
self.password = None
return self.error(0, "ARG", "Missing argument.")
if Auth.run_remoteauth(self.userid, self.password):
self.authenticated = 1
else:
self.authenticated = 0
return self.error(0, "AUTH", "Invalid credentials.")
if Auth.auth_fork(self.userid):
return 1
return 0
def help_address(self):
print "help address"
def do_checkaddress(self, args):
if not self.authenticated:
return self.error(0, "AUTH", "Please authenticate first.")
args = self.parseArgs(args)
from TMDA import Address
## FIXME: add support for localtime, eventually
localtime = None
try:
address = args[0]
except IndexError:
return self.error(0, "ARG", "Missing address to check.")
try:
sender = args[1]
except IndexError:
sender = None
try:
addr = Address.Factory(address)
addr.verify(sender)
self.outputData("STATUS: VALID")
try:
self.outputData("EXPIRES: %s" % addr.timestamp(1, localtime))
except AttributeError:
# this is not a dated address
pass
except Address.AddressError, msg:
self.outputData("STATUS:" + str(msg))
def do_address(self, args):
if not self.authenticated:
return self.error(0, "AUTH", "Please authenticate first.")
args = self.parseArgs(args)
from TMDA import Address
try:
tag = args[0]
except IndexError:
tag = 'dated'
if tag not in ('dated', 'keyword', 'sender'):
return self.error(0, "ARG", "first argument must be one of 'dated',
'keyword' or 'sender'.")
try:
arg = args[1]
except IndexError:
if tag in ('keyword', 'sender'):
return self.error(0, "ARG", "You must provide a second argument.")
arg = None
address = None
try:
tagged_address = Address.Factory(tag = tag).create(address, arg).address
except ValueError, msg:
self.help_address()
return 0
if not tagged_address:
self.help_address()
return 0
self.outputData(tagged_address)
return 0
def do_pending(self, args):
if not self.authenticated:
return self.error(0, "AUTH", "Please authenticate first.")
args = self.parseArgs(args)
from TMDA import Pending
if not args:
args = [ 'all' ]
if args[0] == 'queue':
Pending.InteractiveQueue().initQueue().mainLoop()
return 0
if args[0] == 'all':
args = Pending.Queue().initQueue().listIds()
elif args[0] == 'confirmed':
args = Pending.Queue().initQueue().listConfirmedIds()
elif args[0] == 'released':
args = Pending.Queue().initQueue().listReleasedIds()
elif args[0] == 'delivered':
args = Pending.Queue().initQueue().listDeliveredIds()
elif args[0] == 'only':
args = Pending.Queue().initQueue().listPendingIds()
self.outputData(args)
return 0
def do_message(self, args):
if not self.authenticated:
return self.error(0, "AUTH", "Please authenticate first.")
args = self.parseArgs(args)
from TMDA import Pending
if args[0] == 'terse' and args[1]:
for l in Pending.Message(args[1]).initMessage().terse(tsv=0):
self.outputData(l.replace('\n', r'\n'))
elif args[0] == 'summary' and args[1]:
self.outputData(Pending.Message(args[1]).initMessage().summary())
elif args[0] == 'show' and args[1]:
self.outputData(Pending.Message(args[1]).initMessage().show())
elif args[0] == 'release' and args[1]:
Pending.Message(args[1]).initMessage().release()
elif args[0] == 'delete' and args[1]:
Pending.Message(args[1]).initMessage().delete()
return 0
def do_exit(self, args):
print ' bye.'
return 1
def help_EOF(self):
print "Control-D or exit to quit"
def do_EOF(self, args):
print 'exit'
return self.do_exit(args)
def connection(hostport, ssl=0):
stdin = sys.stdin
stdout = sys.stdout
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
(host, port) = hostport.split(':',2)
except ValueError:
(host, port) = (hostport, defaultport)
if host == 'any':
host = '0.0.0.0'
try:
port = int(port)
except ValueError:
raise IOError, "%s is not a valid port" % port
sock.bind((host, int(port)))
print 'Listening on %s:%s' % (host, port)
sock.listen(5)
try:
while 1:
conn, addr = sock.accept()
pid = os.fork()
if not pid:
if ssl:
## FIXME: ssl support doesn't work
sin = sout = socket.ssl(conn)
else:
sin = conn.makefile('r', 0)
sout = conn.makefile('w', 0)
print 'accepted connection from %s:%s' % addr
try:
TMDAShell('+', sin, sout, stdout).mainLoop()
sys.stdout = stdout
except IOError, msg:
sys.stdout = stdout
print msg
sys.stdin = stdin
print 'closing connection to ' + repr(addr)
sin.close()
sout.close()
break
conn.close()
except KeyboardInterrupt:
print "Keyboard interrupt: exiting..."
sock.close()
def main():
if daemon_mode:
connection(hostport, 0)
else:
TMDAShell().mainLoop()
# This is the end my friend.
if __name__ == '__main__':
main()
_______________________________________
tmda-cvs mailing list
http://tmda.net/lists/listinfo/tmda-cvs