With help from Ed Blackman (err, actually, basically he wrote it :-)),
here is an updated version that has improved exception handling.

#!/usr/bin/env python
#
# Copyright (C) 2003 Jason R. Mastaler <[EMAIL PROTECTED]>
# Copyright (C) 2003 David Bremner <[EMAIL PROTECTED]>

# This is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.  A copy of this license should
# be included in the file COPYING.
#
# This script is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with TMDA; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

# $Id: tmda-check-senders,v 1.5 2003/06/02 10:37:11 bremner Exp $
"""Bad address checker, with optional pending queue cleanup
Usage:  %(program)s [OPTIONS] arg1 [arg2...]

With the -p option, non-option arguments are message-ids. Otherwise they
are addresses.

-p
--pending  operate on the pending queue

-d
--delete   delete messages with bad Reply-Path

-v
--verbose  be more verbose. repeat for more verbosity

-h
--help     print this help message



Requirements
============
Python, and also the PyDNS module, available at
<URL:http://pydns.sourceforge.net/>.
this also requires TMDA

Use
===
to run from your TMDA FILTER_INCOMING
add a 'pipe' entry similar to the following:

# drop mail from verified bogus addresses which can't be replied to
pipe '$HOME/bin/tmda-check-senders $SENDER' drop

Limitations
===========
This technique will not work with MTAs (e.g, qmail) where you
cannot verify the validity of an address via SMTP.  Luckily, most
MTAs do not have this limitation.

You need to change the hardcoded location for TMDA
You may wish to change the (hardcoded) timeout in the routine mxlookup,
and the default socket timeout below.

If you wish to use the pending queue functionality,
then you need valid Reply-path headers in your messages.


Authors
=======
Jason Mastaler wrote the original script.

David Bremner modified it to scan the pending dir (mostly as a way to debug),
and added timeout checking.

Ed Blackman contributed better handling of smtplib exceptions.

Please send any comments or complaints to David Bremner <[EMAIL PROTECTED]>

"""

import DNS # http://pydns.sourceforge.net/
import timeoutsocket     # http://www.timo-tasi.org/python/timeoutsocket.py
import smtplib
import socket
import sys
import os
import getopt
import random
from types import StringType
from email.Utils import parseaddr

# you need to change this
tmdahome=os.path.expanduser("~/pkg/tmda-0.77")
sys.path.insert(0,tmdahome)


from TMDA import Pending
from TMDA import Util

# default timeout for all sockets, in particular smtp
timeoutsocket.setDefaultSocketTimeout(10)

program = sys.argv[0]


def usage(code, msg=''):
    print __doc__ % globals()
    if msg:
        print msg
    sys.exit(code)

def mxlookup(name):
    """
    convenience routine for doing an MX lookup of a name. returns a
    sorted list of (preference, mail exchanger) records
    """
    a = DNS.Base.DnsRequest(name, qtype = 'mx', timeout=1).req().answers
    l = map(lambda x:x['data'], a)
    l.sort()
    return l

def checkMsg(pendingdir,msgid):
    msgfile= os.path.join(pendingdir, msgid )
            
    msgobj = Util.msg_from_file(open(msgfile, 'r'))
    address_to_verify= parseaddr(msgobj.get('return-path'))[1]
    if checkAddr(address_to_verify)==0:
        if delete:
            try:
                os.unlink(msgfile)
            except OSError:
                # in case of concurrent cleanups
                pass



# we preserve the semantics of smpt-check-sender, returning 0 for a match.

def checkAddr(address_to_verify):    
    mail_from_address = '[EMAIL PROTECTED]'

    if address_to_verify.count('@')>0:
        verify_hostname = address_to_verify.split('@', 1)[1]
    else:
        verify_hostname = 'localhost'


    # we read stdin, but don't actually use it

    try:
        mxlist = mxlookup(verify_hostname)

    except DNS.Base.DNSError:
        if verbose:
            print address_to_verify,' DNS lookup failed'
        return 1
        
        
    if mxlist:
        if type(mxlist[0]) is StringType:
                primx=mxlist[0]
        else:
                primx = mxlist[0][1]
        
    else:
        primx = verify_hostname
    # SMTP conversation

    if verbose>1:
        print 'connecting to ',primx

    try:
        server = smtplib.SMTP(primx)
        #server.set_debuglevel(1)
        server.docmd('helo', socket.getfqdn())
        server.docmd('mail from:', '<%s>' % mail_from_address)
        code, smtpmsg = server.docmd('rcpt to:', '<%s>' % address_to_verify)
        server.quit()
    except smtplib.SMTPResponseException, e:
        code, smtpmsg = e.smtp_code, e.smtp_error
    except (timeoutsocket.Timeout,socket.error,smtplib.SMTPException):
        if verbose:
            print address_to_verify,' SMTP connection failed'
        return 1
    
    # RFC 2821 is quite clear in stating that all codes starting
    # with 5 are always `permanent negative completion replies.
    if str(code).startswith('5'):
        print address_to_verify,
        if verbose:
            print ' BAD sender'
        else:
            print ''
        return 0
    else:
        if verbose:
            print address_to_verify, ' GOOD sender'
        return 1



try:
    opts,args=getopt.getopt(sys.argv[1:],
                               'dvhp', ['delete','verbose',
                                       'help','pending'
                                        ])
except getopt.error, msg:
    usage(1, msg)

verbose=0
delete=0
pending=0

for opt, arg in opts:
    if opt in ('-h', '--help'):
        usage(0)
    if opt in ('-v','--verbose'):
        verbose=verbose+1
    if opt in ('-d', '--delete'):
        delete=1
    if opt in ('-p', '--pending'):
        pending=1

DNS.DiscoverNameServers()

if pending:
    q = Pending.Queue()
    q.initQueue()

    if (len(args)==0):
        idList=q.listPendingIds()
        r=random.Random()
        r.shuffle(idList)
    else:
        idList=args
        
    for qEntry in idList:
        checkMsg(q.pendingdir,qEntry)
else:
    # emulate the behaviour of smtp-check-sender
    data = sys.stdin.read()

    status=0
    for addr in args:
        status=status or checkAddr(addr)

    sys.exit(status)








_____________________________________________
tmda-users mailing list ([EMAIL PROTECTED])
http://tmda.net/lists/listinfo/tmda-users

Reply via email to