AI Generated Usenet Client in Python

If you have any improvements to this or helpful ideas I would like to see them.

https://github.com/alt-magick/Newsgroup-Client-/

#!/usr/bin/env python3
import nntplib
import sys
import termios
import tty
import re
import quopri
import base64
from email.message import EmailMessage
import os
import tempfile
import subprocess

# ================= USER CONFIG =================
NNTP_SERVER = "usnews.blocknews.net"
NNTP_PORT   = 563
USERNAME    = ""
PASSWORD    = ""
PAGE_LINES             = 12
MAX_ARTICLES_LIST      = 200
MAX_REPLY_SCAN         = 300
START_GROUP            = "alt.test"
SHOW_REPLY_COUNT       = False
SHOW_REPLY_COUNT_MAIN  = True
# ==============================================

RE_REPLY = re.compile(r"^(re|fwd):", re.IGNORECASE)
CLEAN_RE = re.compile(r"[---]")

# ---------- STATUS LINE ----------
STATUS_LINE = ""

def set_status(msg):
    global STATUS_LINE
    STATUS_LINE = msg

def show_status():
    global STATUS_LINE
    if STATUS_LINE:
        print(f"
[{STATUS_LINE}]")
        STATUS_LINE = ""

# ---------- RAW KEY INPUT ----------
def get_key():
    fd = sys.stdin.fileno()
    old = termios.tcgetattr(fd)
    try:
        tty.setraw(fd)
        return sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old)

def prompt(text):
    sys.stdout.write(text)
    sys.stdout.flush()
    return sys.stdin.readline().strip()

# ---------- HARD-CODED PAGER ----------
def paged_print(lines):
    i = 0
    total = len(lines)
    while i < total:
        end = min(i + PAGE_LINES, total)
        for line in lines[i:end]:
            print(line)
        i = end
        if i >= total:
            break
        print("
--- ENTER = next page | SPACE = skip ---")
        if get_key() == " ":
            break

# ---------- BODY DECODER ----------
def decode_body_line(line_bytes):
    s = line_bytes.decode("utf-8", errors="replace")
    s = CLEAN_RE.sub("", s)
    if "=" in s:
        try:
            s = quopri.decodestring(s).decode("utf-8", errors="replace")
        except Exception:
            pass
    if re.fullmatch(r"[A-Za-z0-9+/=\s]+", s) and len(s.strip()) > 20:
        try:
            s = base64.b64decode(s, validate=True).decode("utf-8", 
errors="replace")
        except Exception:
            pass
    return s

# ---------- POST BODY EDITOR ----------
def edit_body(initial=""):
    editor = os.environ.get("EDITOR", "nano")
    fd, path = tempfile.mkstemp(suffix=".txt")
    try:
        with os.fdopen(fd, "w") as f:
            f.write(initial)
        subprocess.call([editor, path])
        with open(path, "r") as f:
            return f.read()
    finally:
        os.unlink(path)

# ---------- POST BODY SOURCE ----------
def get_post_body():
    print("
Post body source:")
    print("  E = Edit in editor")
    print("  F = Load from external text file")
    print("  T = Type directly in terminal")
    choice = get_key().lower()

    if choice == "f":
        path = prompt("
Enter path to text file: ")
        try:
            with open(path, "r", encoding="utf-8", errors="replace") as f:
                body = f.read()
            if not body.strip():
                set_status("Post aborted (file is empty)")
                return None
            return body
        except Exception as e:
            set_status(f"Failed to read file: {e}")
            return None

    elif choice == "t":
        print("
Type your post below. End with a period on a line by itself.")
        lines = []
        while True:
            line = input()
            if line.strip() == ".":
                break  # stop input when user enters a single period on a line
            lines.append(line)
        body = "
".join(lines)
        if not body.strip():
            set_status("Post aborted (empty input)")
            return None
        return body

    # default: editor
    body = edit_body()
    if not body.strip():
        set_status("Post aborted (empty body)")
        return None
    return body

# ---------- POSTING ----------
def post_article(nntp, group, subject=None, references=None):
    name = prompt("Enter your display name: ")
    email = prompt("Enter your email: ")
    if not subject:
        subject = prompt("Enter subject: ")

    body = get_post_body()
    if not body:
        return False

    msg = EmailMessage()
    msg["From"] = f"{name} <{email}>"
    msg["Newsgroups"] = group
    msg["Subject"] = subject
    if references:
        msg["References"] = references
    msg.set_content(body)

    try:
        nntp.post(msg.as_bytes())
        set_status("Article posted successfully")
        return True
    except Exception as e:
        set_status(f"Post failed: {e}")
        return False

# ---------- REPLY POSTING ----------
def post_reply(nntp, group, article_num):
    try:
        _, hinfo = nntp.head(str(article_num))
        headers = {}
        for raw in hinfo.lines:
            line = decode_body_line(raw)
            if ":" in line:
                k, v = line.split(":", 1)
                headers[k.lower()] = v.strip()

        subject = headers.get("subject", "(no subject)")
        if not RE_REPLY.match(subject):
            subject = "Re: " + subject

        refs = []
        if "references" in headers:
            refs.append(headers["references"])
        if "message-id" in headers:
            refs.append(headers["message-id"])

        return post_article(nntp, group, subject, " ".join(refs))
    except Exception as e:
        set_status(f"Reply failed: {e}")
        return False

# ---------- ARTICLE DISPLAY ----------
def show_article(nntp, num, group=None, allow_reply=False):
    try:
        _, hinfo = nntp.head(str(num))
        headers = {}
        for raw in hinfo.lines:
            line = decode_body_line(raw)
            if ":" in line:
                k, v = line.split(":", 1)
                headers[k.lower()] = v.strip()

        _, body = nntp.body(str(num))
        lines = [decode_body_line(l) for l in body.lines]

        paged_print([
            f"From: {headers.get('from','?')}",
            f"Date: {headers.get('date','?')}",
            f"Subject: {headers.get('subject','(no subject)')}",
            ""
        ] + lines)

        if allow_reply and group:
            print("
P=reply  (any other key to continue)")
            if get_key().lower() == "p":
                post_reply(nntp, group, num)

    except Exception as e:
        set_status(f"Fetch failed: {e}")

# ---------- REPLY SCANNING ----------
def scan_replies_xover(nntp, msgid, first, last):
    replies = []
    start = max(first, last - MAX_REPLY_SCAN)
    try:
        _, overviews = nntp.over((start, last))
    except:
        return replies

    for num, hdr in overviews:
        if msgid in hdr.get("references", ""):
            replies.append(int(num))
    return replies

# ---------- GROUP RELOAD ----------
def reload_group(nntp, group):
    try:
        _, _, first, last, _ = nntp.group(group)
        first = int(first)
        last = int(last)

        _, overviews = nntp.over((max(first, last - MAX_ARTICLES_LIST), last))

        posts = []
        for num, hdr in reversed(overviews):
            subject = hdr.get("subject", "")
            if RE_REPLY.match(subject):
                continue

            msgid = hdr.get("message-id", "")
            replies = sum(
                1 for _, h in overviews if msgid in h.get("references", "")
            ) if SHOW_REPLY_COUNT_MAIN else 0

            posts.append({
                "num": int(num),
                "subject": CLEAN_RE.sub("", subject),
                "from": CLEAN_RE.sub("", hdr.get("from", "?")),
                "date": hdr.get("date", "?"),
                "msgid": msgid,
                "replies": replies
            })

        return posts, first, last

    except Exception as e:
        set_status(f"Reload failed: {e}")
        return None, None, None

# ---------- GROUP BROWSER ----------
def browse_group(nntp, group):
    posts, first, last = reload_group(nntp, group)
    if not posts:
        return

    index = 0

    while index < len(posts):
        p = posts[index]

        print(f"
[{index+1}] #{p['num']}")
        print(f"From: {p['from']}")
        print(f"Date: {p['date']}")
        print(f"Replies: {p['replies'] if SHOW_REPLY_COUNT_MAIN else '?'}")
        print(f"Subject: {p['subject']}")

        show_status()
        print("
ENTER=read  SPACE=next  R=replies  N=new post  L=reload  J=jump  G=group  
Q=quit")

        key = get_key().lower()

        if key == "q":
            sys.exit(0)

        elif key == " ":
            index += 1

        elif key in ("
", "
"):
            show_article(nntp, p["num"], group, True)

        elif key == "n":
            if post_article(nntp, group):
                posts, first, last = reload_group(nntp, group)
                index = 0

        elif key == "l":
            posts, first, last = reload_group(nntp, group)
            index = 0
            set_status("Group reloaded")

        elif key == "j":
            val = prompt("Jump to post number: ")
            if val.isdigit():
                idx = int(val) - 1
                if 0 <= idx < len(posts):
                    index = idx

        elif key == "g":
            browse_group(nntp, prompt("New group: "))
            return

        elif key == "r":
            replies = scan_replies_xover(nntp, p["msgid"], first, last)
            if not replies:
                set_status("No replies found")
                continue

            for i, rnum in enumerate(replies):
                if i < len(replies) - 1:
                    print("
ENTER=next reply | SPACE=skip remaining | P=reply")
                else:
                    print("
End of replies | P=reply")

                k = get_key().lower()

                if k == " ":
                    set_status("Skipped remaining replies")
                    break
                elif k == "p":
                    post_reply(nntp, group, rnum)
                elif k in ("
", "
"):
                    show_article(nntp, rnum, group, True)

# ---------- MAIN ----------
def main():
    print(f"Connecting to {NNTP_SERVER}:{NNTP_PORT}...")
    nntp = nntplib.NNTP_SSL(NNTP_SERVER, NNTP_PORT, USERNAME, PASSWORD)
    set_status("Connected")
    browse_group(nntp, START_GROUP)
    nntp.quit()

if __name__ == "__main__":
    main()
-- 
https://mail.python.org/mailman3//lists/python-list.python.org

Reply via email to