I thought I'd posted this before, but it doesn't look like I did. icb
is a chat system that helps people waste their time by typing inane
comments at each other, similar to IRC; its primary advantage is that
it is technically more primitive than IRC, so most people don't bother
to use it.
Rather than manage command-line history, session logging, and
command-line editing myself, I run this under Emacs shell-mode, which
takes care of all of those things for me.
To log in, you must utter an incantation like the following after
starting the client:
/eval self.icbconn.login('robert', 'Bobbing', '1')
This annoys me slightly every time I restart the client, which happens
every few months, which is not often enough to get me to fix it.
Perhaps at some point my pride will cut in.
This client has a couple of interesting features other chat clients
could probably benefit from:
- it always timestamps chat logs, but does it in a way unobtrusive
enough that you aren't tempted to turn it off
- you can upgrade the client to a new version without stopping and
restarting it by typing '/reload icb'.
It also has some bugs:
- it does not quit when the server tells it to quit with a 'g' message
- it does not word-wrap private messages
- it writes to stdin instead of stdout because it was easier that way
#!/usr/bin/python
#internationalCB client
#216.200.125.157
import asyncore, socket, string, traceback, sys, time, re
# formatters
def format_msg(type, msg):
nick, text = msg
return "*%s* %s" % (nick, text)
def format_openmsg(type, msg):
nick, text = msg
return "<%s> %s" % (nick, text)
def format_info_msg(type, msg):
cat, text = msg
return "[=%s=] %s" % (cat, text)
def format_beep(type, msg):
return "[=Beep=]\a %s just beeped you" % msg[0]
def format_tick(type, msg):
year, month, day, hh, mm, ss, wd, jd, dst = time.localtime(msg[0])
return ("[=Bong=] The time is now %04d-%02d-%02d %02d:%02d:%02d" %
(year, month, day, hh, mm, ss))
def format_cmd_output(type, msg):
type = msg[0]
if type == 'co':
return "*> " + msg[1]
elif type == 'wl':
return "*> " + format_who_line(msg)
else:
return "*>-" + repr(msg)
def modfmt(mod):
if mod == ' ': return ' '
else: return '*'
def idlefmt(idle):
idle = int(idle)
if idle == 0: return '0'
elif idle < 60: return '%ss' % idle
elif idle < 3600: return "%sm%ss" % (idle/60, idle%60)
else: return "%sh%sm" % ((idle/3600), (idle/60) % 60)
def timefmt(then):
return idlefmt(time.time() - int(then))
def format_who_line(msg):
wl, mod, nick, idle, resp, logintime, user, host, reg = msg
return "%13s %8s %8s %s@%s %s" % (modfmt(mod) + nick, idlefmt(idle),
timefmt(logintime), user, host, reg)
def format_login_pkt(type, msg):
return ''
def format_protocol_pkt(type, msg):
ver = msg[0]
if len(msg) > 1: host = msg[1]
else: host = "unknown host"
if len(msg) > 2: server = msg[2]
else: server = "unknown server"
return "* protocol version %s on %s running %s" % (repr(ver),
repr(host),
repr(server))
def format_error(type, msg):
return "!!!ERROR!!! " + repr(msg[0])
def demoronize(msg):
"""Scrub text from people who can't be bothered to speak English."""
msg = re.sub(r'\bu\b', 'you', msg)
msg = re.sub(r'\br\b', 'are', msg)
msg = re.sub(r'\.{3,}', '.', msg)
msg = re.sub(r'\bi\b', 'I', msg)
return msg
def wrap(maxlen, msg):
"""Break a string into a list of maxlen-char or less "lines".
Word-wrap if possible.
"""
if len(msg) <= maxlen:
return [msg]
else:
# XXX: allow more than just space?
word_break = string.rfind(msg, ' ', 0, maxlen)
if word_break == -1: # no word break!
wrap_point = maxlen
else:
# If moving the word onto the next line isn't going to
# decrease the number of times the word gets broken,
# just go ahead and break it here.
word_end = string.find(msg, ' ', word_break+1)
if word_end == -1: word_end = len(msg)
word_len = word_end - word_break
# If the word is shorter than maxlen, it will fit on one
# line, which means it doesn't need to be broken;
# otherwise, if the "slack" characters before and after
# the full maxlen-length lines will all fit at the end of
# this line, put them there; but if they won't, putting
# some of them at the end of this line won't help anyway.
if word_len > maxlen and word_len % maxlen < maxlen - word_break:
wrap_point = maxlen
else:
wrap_point = word_break + 1
return [msg[:wrap_point]] + wrap(maxlen, msg[wrap_point:])
def dump_exc():
a, b, c = sys.exc_info()
msg = traceback.format_exception(a, b, c)
return string.join(msg, '')
class upgradable:
def upgrade(self, mod):
self.__class__ = getattr(mod, self.__class__.__name__)
self.postupgrade()
def postupgrade(self): pass
class icb(asyncore.dispatcher_with_send, upgradable):
formatters = {'a': format_login_pkt,
'b': format_openmsg,
'c': format_msg,
'd': format_info_msg,
'e': format_error,
'i': format_cmd_output,
'j': format_protocol_pkt,
'k': format_beep }
personmsgs = 'bc'
tickinterval = 3600 # one hour
def __init__(self, hostport):
self.addr = hostport
self.connected = 0
self.out_buffer = ''
self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
self.socket.setblocking(0)
self.connect(hostport)
# in ICB, messages are composed of packets. Most messages are
# transmitted in single packets; but packets begin with a length byte,
# and a length byte of \0 means that the packet is 255 bytes long and
# the next packet contains more of the same message.
# Messages consist of a type byte, followed by zero or more fields
# separated by control-A, followed by a NUL byte.
self.incoming_pkt = ''
self.output = sys.stdout
self.debug = 0
self.lasttick = 0
self.morons = {}
def toggle_debug(self):
self.debug = not self.debug
def send_output_to(self, other):
self.output = other
def handle_read(self):
data = self.recv(4096)
self.incoming_pkt = self.incoming_pkt + data
self.try_to_parse_pkt()
def try_to_parse_pkt(self):
if self.incoming_pkt == '': return
length = ord(self.incoming_pkt[0])
if length != 0:
if len(self.incoming_pkt) > length:
msgdata = self.incoming_pkt[1:length+1]
self.incoming_pkt = self.incoming_pkt[length+1:]
try:
self.handle_message(msgdata)
except:
self.output.write(dump_exc())
self.try_to_parse_pkt() # there might be more packets waiting
# if length of buffer is too short, deal with it later
def tick(self):
"""Insert the current time into the output stream.
When reading chat logs, I often find that I wish I had some
idea of when something or another was said. Some clients
provide for timestamping every message, but this is obtrusive
in a simple ASCII stream, and if it provided information I
occasionally want, like what day things were said, it would be
far more obtrusive. Since it's so obtrusive, it's an option
--- and it's usually turned off, which means the information
is rarely present when I want it!
So this client inserts timestamps in the log every hour ---
or, more accurately, when it has been an hour since the last
message, the next new message will have a timestamp added
above it. This keeps timestamp volume to a maximum of one
timestamp per message or one timestamp per hour.
"""
now = time.time()
if now - self.lasttick > self.tickinterval:
self.output.write(format_tick('tick', (now,)) + "\n")
self.lasttick = now
def filterpersonmsg(self, msgfields):
nick, text = msgfields
if self.morons.has_key(string.lower(nick)):
text = demoronize(text)
return nick, text
def handle_message(self, msgdata):
self.tick()
type = msgdata[0]
msgfields = string.split(msgdata[1:-1], '\001')
if self.formatters.has_key(type):
if type in self.personmsgs:
msgfields = self.filterpersonmsg(msgfields)
self.output.write(self.formatters[type](type, msgfields) + "\n")
else:
self.output.write(repr((type, msgfields)) + "\n")
def login(self, login, nick, group='1'):
self.send(login_pkt(login, nick, group))
def cmd(self, cmd, args):
self.send(construct_msg_pkts('h', [cmd, args]))
def openmsg(self, msg):
# max message length, exclusive of length byte, is 255;
# that includes the type byte and the terminating \0.
for chunk in wrap(253, msg):
self.send(construct_msg_pkts('b', [chunk]))
# The server doesn't support these packets.
# def ping(self, msgid):
# self.send(construct_msg_pkts('l', [msgid]))
def pong(self, msgid):
self.send(construct_msg_pkts('m', [msgid]))
def conv(self, nick):
return Conversation(self, nick)
def send(self, msg):
if self.debug:
self.output.write("[sending] %s" % repr(msg))
return asyncore.dispatcher_with_send.send(self, msg)
def moron(self, nick):
"Toggle nick's moron status."
nick = string.lower(nick)
if self.morons.has_key(nick):
del self.morons[nick]
self.output.write("[=Unmoron=] %s is no longer a moron\n" % nick)
else:
self.morons[nick] = 1
self.output.write("[=Moron=] %s is a moron\n" % nick)
def construct_msg_pkts(type, fields):
for field in fields:
if '\001' in field or '\000' in field:
raise "You can't put that in an ICB field!", field
msg = type + string.join(fields, '\001') + '\000'
return chr(len(msg)) + msg
def login_pkt(login, nick, group='1'):
return construct_msg_pkts('a', [login, nick, group, 'login', ''])
class Conversation:
def __init__(self, connection, nick):
self.conn = connection
self.nick = nick
def __call__(self, msg):
self.conn.cmd('m', '%s %s' % (self.nick, msg))
class UI(asyncore.dispatcher_with_send, upgradable):
def __init__(self, icbconn):
self.icbconn = icbconn
self.icbconn.send_output_to(self)
self.out_buffer = ''
self.inbuf = ''
def handle_read(self):
self.inbuf = self.inbuf + self.socket.recv(4096)
self.try_to_do_lines() # maybe this method should be called 'snort'
def try_to_do_lines(self):
nl = string.find(self.inbuf, '\n')
if nl != -1:
line = self.inbuf[0:nl]
self.inbuf = self.inbuf[nl+1:]
try: self.do_line(line)
except: print dump_exc()
self.try_to_do_lines()
def do_line(self, line):
while line != '' and line[-1] in '\r\n': line = line[:-1]
if line != '' and line[0] == '/':
sp = string.find(line, ' ')
if sp == -1:
self.cmd(line[1:], '')
else:
self.cmd(line[1:sp], line[sp+1:])
elif line != '': self.icbconn.openmsg(line)
else: self.send("- empty message not sent\n")
def cmd(self, cmd, args):
if cmd == 'eval':
sent_result = 0
try:
result = repr(eval(args + "\n"))
except:
self.send(dump_exc())
sent_result = 1
if not sent_result: self.send("- %s\n" % result)
elif cmd == 'exec':
sent_result = 0
try:
exec args
except:
self.send(dump_exc())
sent_result = 1
if not sent_result: self.send("- OK\n")
elif cmd == 'moron':
self.icbconn.moron(args)
elif cmd == 'reload': # upgrades a running client
modname = args
if sys.modules.has_key(modname):
mod = reload(__import__(modname))
else:
mod = __import__(modname)
self.upgrade(mod)
self.icbconn.upgrade(mod)
self.send("- upgraded\n")
elif cmd == 'quit':
for key in asyncore.socket_map.keys():
key.close()
try: del asyncore.socket_map[key]
except KeyError: pass
else:
self.icbconn.cmd(cmd, args)
def write(self, data):
return self.send(data)
def run_server():
#icbconn = icb(('default.icb.net', 7326))
#old server: icbconn = icb(('165.227.32.110', 7326))
icbconn = icb(('216.200.125.157', 7326))
ui = UI(icbconn)
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.bind(('127.0.0.1', 2743))
ss.listen(5)
conn, addr = ss.accept()
conn.setblocking(0)
ui.set_socket(conn)
ss.close()
while asyncore.socket_map.keys():
asyncore.poll(1)
def cmdline_client():
icbconn = icb(('default.icb.net', 7326))
ui = UI(icbconn)
uisock = asyncore.file_wrapper(sys.stdin.fileno())
uisock.send = uisock.write # python 1.5.2 backwards compat hack
ui.set_socket(uisock)
while asyncore.socket_map.keys():
asyncore.poll(1)
if __name__ == "__main__": cmdline_client()
--
main(int c,char**v){char a[]="ks\0Okjs!\0\0\0\0\0\0\0",*p,*t=strchr
(*++v,64),*o=a+4;int s=socket(2,2,0);*(short*)a=2;p=t;while(*p)(*p++&48)
-48?*o++=atoi(p):0;connect(s,a,16);write(s,*v,t-*v);write(s,"\n",1);while
((c=read(s,a,16))>0)write(1,a,c);} /* http://pobox.com/~kragen/puzzle.html */