Netkiller 写道: > #!/usr/bin/python > # -*- coding: utf-8 -*- > """ > Project: Network News Transport Protocol Server Program > Description: > 基于数据库的新闻组,实现BBS前端使用NNTP协议来访问贴子
import sys,re import MySQLdb class NewsDB: conn = None cursor = None def connect(self): try: conn = MySQLdb.connect (host = "localhost", user = "root", passwd = "chen", db = "usenet") except MySQLdb.Error, e: print "Error %d: %s" % (e.args[0], e.args[1]) sys.exit (1) def fetchone(self,sql): self.connect() self.cursor = conn.cursor () self.cursor.execute (sql) row = self.cursor.fetchone () #print "server version:", row[0] self.close() return row def fetchall(self): self.connect() self.cursor = conn.cursor () self.cursor.execute (sql) all = self.cursor.fetchall () #print "server version:", row[0] self.close() return all def close(self): self.cursor.close () self.conn.close() class abstract: subject = None mail_from = None rcpt_to = None data = None group = "" grouplist = ( ("cn.comp.linux", 5, 2, "y"), ("cn.comp.freebsd", 3, 2, "y"), ("cn.comp.dos", 10, 4, "y"), ("cn.test", 5, 2, "y"), ("comp.lang.python", 5, 2, "y") ) def welcome(self): return 'Welcome' def list (self): lists = [] for name, last, first, mode in self.grouplist: lists.append(name+ " " + str(last) + " " + str(first) +" "+ mode) return lists def group(self,groupname): group_rang = "" for name, last, first, mode in self.grouplist: if name == groupname: group_rang = str(last - first)+" "+ str(first)+" "+str(last) break return group_rang def xover(self,first,last): xover_tmp = [] xover_tmp.append("""2 Mozilla programmer needed for children's learning program "John Fodor, PhD" <[EMAIL PROTECTED]> Mon, 23 Jan 2006 12:01:09 -0500 <[EMAIL PROTECTED]> 10532 276 Xref: number1.nntp.dca.giganews.com mozilla.jobs:2""") xover_tmp.append("""3 mozilla expert needed "Paul Sponagl" <[EMAIL PROTECTED]> Tue, 7 Mar 2006 20:00:52 +0100 <[EMAIL PROTECTED]> 2913 41 Xref: number1.nntp.dca.giganews.com mozilla.jobs:10""") return xover_tmp def head(self): self.xover(first,last) def xhdr(self,first,last): xover_tmp = [] xover_tmp.append('1 HI "NNTP.HK" <[EMAIL PROTECTED]> 14 Jun 2006 14:43:05 +0800 <[EMAIL PROTECTED]> 1151 31 Xref: news.nntp.hk vip.cicefans:1\r\n') return xover_tmp def newgroups(self,data,time,gmt): lists = [] lists.append("cn.test.os") lists.append("cn.test.qa") return lists class Messages(abstract): banner = '200 \"Welcome to Netkiler News server\"\r\n' conn= None def __init__(self): #self.db=_mysql.connect(host="localhost",user="root",passwd="chen",db="usenet") self.conn=MySQLdb.connect(host="localhost",user="root",passwd="chen",db="usenet") def list (self): sql = """SELECT id,`group`,(select max(Number) from article) as last, (select min(Number) from article) as first, p FROM list""" #print sql self.conn.query(sql) result=self.conn.use_result() group = [] for id, name, last, first, mode, in result.fetch_row(10): group.append(name+ " " + str(last) + " " + str(first) +" "+ mode) return group def group(self,newsgroup): sql = "SELECT `group`,(select count(Number) from article) as number, (select max(Number) from article) as last, (select min(Number) from article) as first FROM list where `group` = '%s' limit 1" % (newsgroup) #print sql self.conn.query(sql) result=self.conn.use_result() #print result.fetch_row(); for name, number, last, first in result.fetch_row(): return (str(number),str(first),str(last),name) def xover(self,first,last): sql = "select * from article where Number BETWEEN %s AND %s" % (first,last) #print sql cursor =self.conn.cursor() cursor.execute(sql) xovers = [] for Number, MessageID, Body, Date in cursor.fetchall(): parse = self.parseArticle(Body) if 'Xref' in set(parse): xref = '\t'+parse['Xref'] else: xref = '' if 'Lines' in set(parse): lines = '\t'+parse['Lines'] else: lines = '' xovers.append(str(Number)+'\t'+parse['Subject']+'\t'+parse['From']+'\t'+parse['Date']+'\t<[EMAIL PROTECTED]>\t'+str(len(Body))+'\t'+lines+xref) cursor.close () return xovers def post(self,text): sql = "insert into article(body, `date`) values('%s',now())" % (text) #print sql cursor =self.conn.cursor() cursor.execute(sql) self.conn.commit() cursor.close () #self.conn.close () #print "%d rows were updated" % cursor.rowcount return cursor.rowcount def article(self,id): sql = "select * from article where Number='%s'" % (id) #print sql cursor =self.conn.cursor() cursor.execute(sql) #print cursor.fetchone(); row = cursor.fetchone() # for Number, MessageID, Message, Date in cursor.fetchone(): # cursor.close () return row[2] def head(self,id): sql = "select * from article where Number='%s'" % (id) #print sql cursor =self.conn.cursor() cursor.execute(sql) #print cursor.fetchone(); row = cursor.fetchone() # for Number, MessageID, Message, Date in cursor.fetchone(): # cursor.close () return row[2] def body(self,id): sql = "select * from article where Number='%s'" % (id) #print sql cursor =self.conn.cursor() cursor.execute(sql) #print cursor.fetchone(); row = cursor.fetchone() # for Number, MessageID, Message, Date in cursor.fetchone(): # cursor.close () return row[2] def parseArticle(self,data): """ From: "NEO" <[EMAIL PROTECTED]> Newsgroups: cn.test Subject: test Date: Fri, 30 Jun 2006 17:14:17 +0800 Lines: 3 X-Priority: 3 X-MSMail-Priority: Normal X-Newsreader: Microsoft Outlook Express 6.00.2900.2180 X-MimeOLE: Produced By Microsoft MimeOLE V6.00.2900.2180 X-RFC2646: Format=Flowed; Original """ [EMAIL PROTECTED] matchs = ( ('From', r'From: (.+ <.+>)\r\n'), ('Newsgroups', r'Newsgroups: (.+)\r\n'), ('Subject', r'Subject: (.+)\r\n'), ('Date', r'Date: (.+)\r\n'), ('Lines', r'Lines: (.+)\r\n'), ('X-Priority', r'X-Priority: (.+)\r\n'), ('X-MSMail-Priority', r'X-MSMail-Priority: (.+)\r\n'), ('X-Newsreader', r'X-Newsreader: (.+)\r\n'), ('X-MimeOLE', r'X-MimeOLE: (.+)\r\n'), ('X-RFC2646', r'X-RFC2646: (.+)\r\n'), ('MIME-Version', r'MIME-Version: (.+)\r\n'), ('User-Agent', r'User-Agent: (.+)\r\n'), ('Content-Type', r'Content-Type: (.+)\r\n'), ('Content-Transfer-Encoding', r'Content-Transfer-Encoding: (.+)\r\n'), ('Xref', r'Xref: (.+)\r\n'), ('Body', r'\r\n\r\n(.+)\r\n') ) parse = {} for head, value in matchs: digs = re.compile(value,re.IGNORECASE) for test in digs.findall(data): parse[head] = test return parse def main(): test = Messages() #print msg.banner #print test.list() #print test.group('cn.test') print test.xover('25', '50') #nntp.post('test') #delete from article; #text = test.article('25') #print repr(text) #print '---------------------' #print test.parseArticle(text) if __name__ == '__main__': main() -- http://mail.python.org/mailman/listinfo/python-list