ArielGlenn has submitted this change and it was merged.
Change subject: toy scripts playing with long-range compression
......................................................................
toy scripts playing with long-range compression
Change-Id: I06519466a4ffaf82371d50dea42e067b73e0ba6f
---
A toys/long-range-zippers/README.txt
A toys/long-range-zippers/blks-rzip.py
A toys/long-range-zippers/blks-standalone.py
3 files changed, 644 insertions(+), 0 deletions(-)
Approvals:
ArielGlenn: Looks good to me, approved
jenkins-bot: Verified
diff --git a/toys/long-range-zippers/README.txt
b/toys/long-range-zippers/README.txt
new file mode 100644
index 0000000..19af10a
--- /dev/null
+++ b/toys/long-range-zippers/README.txt
@@ -0,0 +1,55 @@
+These are a couple of scripts playing with alternate compressors for
+full-history dumps. The essential idea is that general-purpose compression
+tools weren't made for files like history dumps that have many repetitions
+of long chunks of content, sometimes spaced far apart. Using tools made for
+this sort of content can save you compression time or space.
+
+Both of these scripts share a lot of boilerplate to split uncompressed
+content into 10M chunks; that allows the script to stream input and output
+even if the underlying compressor can't, and allows random read access to
+the compressed content.
+
+
+* blks-standalone.py preprocesses the file in Python, then runs gzip or
+ bzip.
+
+ Specifically, some Python code produces 1) a sorted list of all *distinct*
+ lines in the input, and 2) indices into that array for every line of the
+ input. It gzips (or, with minor code changes, can bzip) both the indices
+ and the lines.
+
+ Here's a slightly simplified copy of the compression code to illustrate:
+
+ lines = input.split('\n')
+ line_list = sorted(set(lines))
+ line_dict = dict((line,i) for i,line in enumerate(line_list))
+ line_numbers = array.array('I', (line_dict[line] for line in lines))
+ nums = zip(line_numbers.tostring())
+ text = zip('\n'.join(line_list))
+ return pack_num(len(nums)) + nums + text
+
+ In a test on a c1.medium EC2 instance, it compressed 4.1G to 63M in 2m45s
+ wall time. Using bzip instead of gzip on the text, it gets the file to 56M
+ in 3m40s.
+
+
+* blks-rzip.py runs rzip, from http://rzip.samba.org/, which internally uses
+ a hashtable to find long repetitions (32+ chars) anywhere in the file,
+ then bzips the 'instructions' and the remaining text.
+
+ rzip's more efficient: in the saem test scenario above, it creates a 44M
+ file in three minutes. You can also run rzip -0 and gzip the output for
+ faster results with worse compression. rzip also works on all sorts of
+ files with long-range redundancy, whereas the line-based trick in
+ blks-standalone only works on text content. People obviously need the
+ rzip package installed to use it; there's a pure-Python decompressor in
+ the script, but it's slow.
+
+
+None of this is optimal. If anything, the performance numbers from this
+just indicate there may be gains from even blunt approaches to exploiting
+the similarity between revisions.
+
+Everything here is public domain.
+
+Randall Farmer, twotwotwo at gmail, 2013
\ No newline at end of file
diff --git a/toys/long-range-zippers/blks-rzip.py
b/toys/long-range-zippers/blks-rzip.py
new file mode 100755
index 0000000..68005ee
--- /dev/null
+++ b/toys/long-range-zippers/blks-rzip.py
@@ -0,0 +1,351 @@
+#!/usr/bin/env python
+# Randall Farmer, twotwotwo at gmail, 2013. Public domain. No warranty.
+import sys, subprocess, tempfile, gzip as gzip_mod, array, os, getopt, bisect,
ctypes, bz2
+from struct import pack, unpack
+from cStringIO import StringIO
+from binascii import crc32
+
+# pure-Python runzipper won't decompress more than this
+MAX_SIZE = 1024*1024*300
+
+# check our endianness
+LITTLE_ENDIAN = array.array('L',[1]).tostring()[0] == '\x01'
+
+MAGIC = 'BLKS'
+
+MISSING_RZIP = os.system('rzip -V > /dev/null 2> /dev/null')
+
+### MEMORY-TO-MEMORY RUNZIP FROM PYTHON
+
+class RZFile:
+ def __init__(self, f=None, s=None):
+ self.infile = StringIO(s or f.read())
+ file_header = self.infile.read(24)
+ # we like rzip | gzip, too
+ if file_header[0:2] == '\0x1f\0x8b':
+ self.infile.seek(0)
+ self.infile = gzip_mod.GzipFile(fileobj=self.infile)
+ file_header = self.infile.read(24)
+ self.initial_pos = 24
+ if file_header[0:4] != 'RZIP':
+ raise Exception('not rzip')
+ self.outsize, = unpack("!i", file_header[6:10])
+ if self.outsize > MAX_SIZE:
+ raise Exception('won\'t expand >%d bytes into RAM, saw %d' %
(MAX_SIZE, self.outsize))
+ self.outbuf = ctypes.create_string_buffer(self.outsize)
+ self.outoffs = 0
+ self.instreams = [None, None]
+ self.nexthdrs = [24,37]
+
+ def read_stream(self,which,bytes):
+ res = ''
+ if self.instreams[which]:
+ res = self.instreams[which].read(bytes)
+ if len(res) == bytes:
+ return res
+ # we're just starting, or crossed a chunk boundary.
+ while len(res) < bytes:
+ self.infile.seek(self.nexthdrs[which])
+ chunk_header = self.infile.read(13)
+ if not chunk_header:
+ raise Exception('no chunk header where expected')
+ ctype,c_len,buflen,next = unpack('<bLLL', chunk_header)
+ next += self.initial_pos
+ data = None
+ if ctype == 3: # uncompressed
+ data = self.infile.read(c_len)
+ elif ctype == 4: # bzip
+ data = bz2.decompress(self.infile.read(c_len))
+ else:
+ raise Exception('bad chunk type--corrupt file? runzip bug?')
+ self.instreams[which] = StringIO(data)
+ self.nexthdrs[which] = next
+ res = self.instreams[which].read(bytes)
+ if len(res) == bytes:
+ return res
+
+ def decompress(self):
+ while True:
+ hdr = self.read_stream(0,3)
+ if not hdr:
+ return self.outbuf # WE'RE DONE
+ type,len = unpack('<bH', hdr)
+ if type == 0: # literal
+ if not len:
+ return self.outbuf # WE'RE DONEISH
+ self.outbuf[self.outoffs:self.outoffs+len] =
self.read_stream(1,len)
+ elif type == 1: # match
+ offs, = unpack('<I', self.read_stream(0,4))
+ while len > offs: # repeats
+ self.outbuf[self.outoffs:self.outoffs+offs] = \
+ self.outbuf[self.outoffs-offs:self.outoffs]
+ len -= offs
+ self.outoffs += offs
+ self.outbuf[self.outoffs:self.outoffs+len] = \
+ self.outbuf[self.outoffs-offs:self.outoffs-offs+len]
+ else:
+ # rzip accepts >1 as a match, but only emits 1
+ raise Exception('unexpected instruction %d' % type)
+ self.outoffs += len
+
+def runzip(s):
+ rz = RZFile(s=s)
+ buf = rz.decompress()
+ return buf.raw
+
+compressor = lambda f,g: ['rzip', f, '-1o', g]
+decompressor = lambda f,g: ['rzip', '-d', f,'-o', g]
+suffix='.rz'
+mysuffix = '-blks'
+infile = sys.stdin
+outfile = sys.stdout
+blksz = 10000000
+
+skip = length = None
+inputoffs = outputoffs = 0
+blocks = array.array('L')
+
+### PORTABLE I/O NONENSE FOR WRAPPER FORMAT
+pack_num = lambda i: pack('>Q', i)
+unpack_num = lambda s: unpack('>Q', s)[0]
+read_num = lambda f: unpack_num(f.read(8))
+write_num = lambda f,i: f.write(pack_num(i))
+
+def read_chunk():
+ l = read_num(infile)
+ if l == 0:
+ return ''
+
+ if MISSING_RZIP:
+ return runzip(s=infile.read(l))
+
+ global decompressor
+ chunk = None
+ tmp = tempfile.NamedTemporaryFile(suffix=suffix,delete=False)
+ tmp.write(infile.read(l))
+ tmp.flush()
+ proc = subprocess.Popen(
+ decompressor(tmp.name, tmp.name[:-len(suffix)])
+ )
+ proc.wait()
+ chunk = open(tmp.name[:-len(suffix)]).read()
+ os.unlink(tmp.name[:-len(suffix)])
+ # r(un)zip deletes tmp if it works, but if we fail (and crash)
+ # we still should not leave tmp files around
+ try:
+ os.unlink(tmp.name)
+ except OSError:
+ # as expected
+ pass
+ return chunk
+
+def gunzip(s):
+ sio = StringIO(s)
+ return gzip_mod.GzipFile(fileobj=sio).read()
+
+def read_index():
+ infile.seek(-8, os.SEEK_END)
+ index_length = read_num(infile)
+ infile.seek(-(8 + index_length), os.SEEK_END)
+ gzipped_index = infile.read(index_length)
+ index_str = gunzip(gzipped_index)
+ # even indices are input offsets, odd are block offsets
+ index = array.array('L')
+ index.fromstring(index_str)
+ if LITTLE_ENDIAN:
+ index.byteswap()
+ formatted_index = []
+ for i in xrange(0, len(index), 2):
+ formatted_index.append((index[i],index[i+1]))
+ return formatted_index
+
+def read_random(index, skip, length):
+ # find block to start on, how much to skip
+ i = bisect.bisect_right(index, (skip,0))
+ inputstart,blockstart = index[i-1]
+ infile.seek(blockstart)
+
+ # output partial from the first chunk
+ outbytes = 0
+ blockskip = skip - inputstart
+ block = read_chunk()
+ if not block:
+ raise Exception('out of file')
+ truncated = block[blockskip:blockskip+length]
+ outfile.write(truncated)
+ outbytes += len(truncated)
+
+ # until we're past the end offset...
+ while outbytes < length:
+ # ...output chunks, truncating if we have to
+ block = read_chunk()
+ if not block:
+ raise Exception('out of file')
+ truncated = block[:length - outbytes]
+ outfile.write(truncated)
+ outbytes += len(truncated)
+
+
+def write_chunk():
+ chunk = infile.read(blksz)
+ if not chunk:
+ return False
+
+ global inputoffs, outputoffs, compressor
+ # track position in stdin
+ inputstart = inputoffs
+ inputoffs += len(chunk)
+ inputend = inputoffs
+
+ result = None
+ tmp = tempfile.NamedTemporaryFile(delete=False)
+ tmp.write(chunk)
+ tmp.flush()
+ proc = subprocess.Popen(
+ compressor(tmp.name, tmp.name + suffix)
+ )
+ proc.wait()
+ result = open(tmp.name + suffix).read()
+ os.unlink(tmp.name + suffix)
+ # r(un)zip deletes tmp if it works, but if we fail (and crash)
+ # we still should not leave tmp files around
+ try:
+ os.unlink(tmp.name)
+ except OSError:
+ # expected: r(un)zip deletes the original
+ pass
+
+ outputstart = outputoffs
+ write_num(outfile,len(result))
+ outputoffs += 8
+ outfile.write(result)
+ outputoffs += len(result)
+
+ blocks.extend(
+ (inputstart,outputstart)
+ )
+ return True
+
+def gzip(s):
+ sio = StringIO()
+ gzipfile = gzip_mod.GzipFile(mode='w',fileobj=sio)
+ gzipfile.write(s)
+ gzipfile.close()
+ return sio.getvalue()
+
+def infile_to_outfile():
+ outfile.write(MAGIC)
+ global outputoffs
+ outputoffs += len(MAGIC)
+ while write_chunk():
+ pass
+ write_num(outfile,0)
+ if LITTLE_ENDIAN:
+ blocks.byteswap()
+ zipped_index = gzip(blocks.tostring())
+ outfile.write(zipped_index)
+ write_num(outfile,len(zipped_index))
+
+def main():
+ global skip, length, infile, outfile, suffix
+ global compressor_name, compressor, decompressor
+ decompress = force = pipe = keep = False
+ level = None
+ opts, args = getopt.gnu_getopt(sys.argv[1:],'dcp:fk19',['skip=','length='])
+ for o,v in opts:
+ if o == '--skip':
+ skip = int(v)
+ elif o == '--length':
+ length = int(v)
+ elif o == '-d':
+ decompress = True
+ elif o == '-c':
+ pipe = True
+ elif o == '-f': # write to tty, overwrite
+ force = True
+ elif o == '-k': # keep original file
+ keep = True
+ else:
+ raise Exception('%s unsupported' % o)
+
+ if len(args) > 2:
+ raise Exception('I only support infile and outfile as args')
+
+ if decompress is None:
+ if 'blkunz' in sys.argv[0]:
+ decompress = True
+ else:
+ decompress = False
+
+ if len(args) > 0:
+ infile = open(args[0])
+ if decompress is False and args[0].endswith(mysuffix) and not force:
+ raise Exception('won\'t recompress %s without -f' % mysuffix)
+
+ # pick a default filename if needed
+ outfn = cmdline_outfn = None
+ if len(args) > 1:
+ if pipe:
+ raise Exception('-c (pipe) doesn\'t make sense with an outfile')
+ outfn = args[1]
+ else:
+ outfn = None
+ if decompress and infile.name.endswith(suffix + mysuffix) and not
length:
+ outfn = infile.name[:-len(suffix + mysuffix)]
+ if not decompress and len(args) == 1:
+ outfn = args[0] + suffix + mysuffix
+
+ # open it, unless we'd zip to a terminal or -c was passed in
+ if outfn and not pipe:
+ exists = False
+ try:
+ os.stat(outfn)
+ if not force:
+ raise Exception('won\'t overwrite %s without -f' % outfn)
+ except OSError:
+ # file not found, yay
+ pass
+ outfile = open(outfn, 'w')
+
+ if outfile.isatty() and not decompress and not force:
+ raise Exception('won\'t compress to terminal without -f')
+
+ if decompress:
+ magic = infile.read(4)
+ if magic != MAGIC:
+ raise Exception('sorry; the file is not from this program')
+ if skip or length:
+ if skip is None or length is None:
+ raise Exception('--skip and --length go together')
+ index = read_index()
+ read_random(index,skip,length)
+ else:
+ chunk = read_chunk()
+ while chunk:
+ outfile.write(chunk)
+ chunk = read_chunk()
+ else:
+ if MISSING_RZIP:
+ sys.stderr.write(
+ "You don't have rzip installed. Consider:\n"
+ " sudo apt-get install rzip\n"
+ " -or-\n"
+ " yum install rzip\n"
+ " -or-\n"
+ " http://rzip.samba.org/\n"
+ "Decompression is still possible, but slow."
+ )
+ sys.exit(1)
+ if skip or length:
+ raise Exception('--skip and --length are decompress-only')
+ infile_to_outfile()
+
+ if outfn and not (pipe or keep):
+ os.unlink(infile.name)
+
+if __name__ == '__main__':
+ main()
+else:
+ raise Exception('not a module')
+
+
diff --git a/toys/long-range-zippers/blks-standalone.py
b/toys/long-range-zippers/blks-standalone.py
new file mode 100755
index 0000000..c049605
--- /dev/null
+++ b/toys/long-range-zippers/blks-standalone.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+# Randall Farmer, twotwotwo at gmail, 2013. Public domain. No warranty.
+import sys, gzip as gzip_mod, array, os, getopt, bisect, ctypes, bz2
+from struct import pack, unpack
+from cStringIO import StringIO
+
+LITTLE_ENDIAN = array.array('L',[1]).tostring()[0] == '\x01'
+pack_num = lambda i: pack('>Q', i)
+unpack_num = lambda s: unpack('>Q', s)[0]
+read_num = lambda f: unpack_num(f.read(8))
+write_num = lambda f,i: f.write(pack_num(i))
+
+def gzip(s):
+ sio = StringIO()
+ gzipfile = gzip_mod.GzipFile(mode='w',fileobj=sio,compresslevel=6)
+ gzipfile.write(s)
+ gzipfile.close()
+ return sio.getvalue()
+
+def unzip(s):
+ if s[:3] == 'BZh':
+ return bz2.decompress(s)
+ sio = StringIO(s)
+ return gzip_mod.GzipFile(fileobj=sio).read()
+
+nums_compressor = gzip
+text_compressor = gzip
+
+def compress(s):
+ lines = s.split('\n')
+ line_list = sorted(set(lines))
+ # if the line numbers would overflow 16 bits, bail
+ if len(line_list) > 65535:
+ return pack_num(0) + text_compressor(s)
+ line_dict = dict((line,i) for i,line in enumerate(line_list))
+ line_numbers = array.array('I', (line_dict[line] for line in lines))
+ if LITTLE_ENDIAN:
+ line_numbers.byteswap()
+ nums = nums_compressor(line_numbers.tostring())
+ text = text_compressor('\n'.join(line_list))
+ return pack_num(len(nums)) + nums + text
+
+def decompress(s):
+ nums_length = unpack_num(s[0:8])
+ if nums_length == 0:
+ return unzip(s[8:])
+ line_numbers = array.array('I', unzip(s[8:8+nums_length]))
+ if LITTLE_ENDIAN:
+ line_numbers.byteswap()
+ lines = unzip(s[8+nums_length:]).split('\n')
+ return '\n'.join(lines[i] for i in line_numbers)
+
+MAGIC = 'BLKS'
+
+infile = sys.stdin
+outfile = sys.stdout
+blksz = 10000000
+
+skip = length = None
+inputoffs = outputoffs = 0
+blocks = array.array('L')
+mysuffix = '.test'
+
+def read_chunk():
+ l = read_num(infile)
+ if l == 0:
+ return ''
+ return decompress(infile.read(l))
+
+def read_index():
+ infile.seek(-8, os.SEEK_END)
+ index_length = read_num(infile)
+ infile.seek(-(8 + index_length), os.SEEK_END)
+ gzipped_index = infile.read(index_length)
+ index_str = unzip(gzipped_index)
+ # even indices are input offsets, odd are block offsets
+ index = array.array('L')
+ index.fromstring(index_str)
+ if LITTLE_ENDIAN:
+ index.byteswap()
+ formatted_index = []
+ for i in xrange(0, len(index), 2):
+ formatted_index.append((index[i],index[i+1]))
+ return formatted_index
+
+def read_random(index, skip, length):
+ # find block to start on, how much to skip
+ i = bisect.bisect_right(index, (skip,0))
+ inputstart,blockstart = index[i-1]
+ infile.seek(blockstart)
+
+ # output partial from the first chunk
+ outbytes = 0
+ blockskip = skip - inputstart
+ block = read_chunk()
+ if not block:
+ raise Exception('out of file')
+ truncated = block[blockskip:blockskip+length]
+ outfile.write(truncated)
+ outbytes += len(truncated)
+
+ # until we're past the end offset...
+ while outbytes < length:
+ # ...output chunks, truncating if we have to
+ block = read_chunk()
+ if not block:
+ raise Exception('out of file')
+ truncated = block[:length - outbytes]
+ outfile.write(truncated)
+ outbytes += len(truncated)
+
+
+def write_chunk():
+ chunk = infile.read(blksz)
+ if not chunk:
+ return False
+
+ global inputoffs, outputoffs
+ inputstart = inputoffs
+ outputstart = outputoffs
+
+ inputoffs += len(chunk)
+ result = compress(chunk)
+ write_num(outfile,len(result))
+ outputoffs += 8
+ outfile.write(result)
+ outputoffs += len(result)
+
+ blocks.extend(
+ (inputstart,outputstart)
+ )
+ return True
+
+def infile_to_outfile():
+ outfile.write(MAGIC)
+ global outputoffs
+ outputoffs += len(MAGIC)
+ while write_chunk():
+ pass
+ write_num(outfile,0)
+ if LITTLE_ENDIAN:
+ blocks.byteswap()
+ zipped_index = gzip(blocks.tostring())
+ outfile.write(zipped_index)
+ write_num(outfile,len(zipped_index))
+
+def main():
+ global skip, length, infile, outfile
+ global compressor_name, compressor, decompressor
+ want_decompress = force = pipe = keep = False
+ level = None
+ opts, args = getopt.gnu_getopt(sys.argv[1:],'dcp:fk19',['skip=','length='])
+ for o,v in opts:
+ if o == '--skip':
+ skip = int(v)
+ elif o == '--length':
+ length = int(v)
+ elif o == '-d':
+ want_decompress = True
+ elif o == '-c':
+ pipe = True
+ elif o == '-f': # write to tty, overwrite
+ force = True
+ elif o == '-k': # keep original file
+ keep = True
+ else:
+ raise Exception('%s unsupported' % o)
+
+ if len(args) > 2:
+ raise Exception('I only support infile and outfile as args')
+
+ if want_decompress is None:
+ if 'blkunz' in sys.argv[0]:
+ want_decompress = True
+ else:
+ want_decompress = False
+
+ if len(args) > 0:
+ infile = open(args[0])
+ if want_decompress is False and args[0].endswith(mysuffix) and not
force:
+ raise Exception('won\'t recompress %s without -f' % mysuffix)
+
+ # pick a default filename if needed
+ outfn = cmdline_outfn = None
+ if len(args) > 1:
+ if pipe:
+ raise Exception('-c (pipe) doesn\'t make sense with an outfile')
+ outfn = args[1]
+ else:
+ outfn = None
+ if want_decompress and infile.name.endswith(mysuffix) and not length:
+ outfn = infile.name[:-len(mysuffix)]
+ if not want_decompress and len(args) == 1:
+ outfn = args[0] + mysuffix
+
+ # open it, unless we'd zip to a terminal or -c was passed in
+ if outfn and not pipe:
+ exists = False
+ try:
+ os.stat(outfn)
+ if not force:
+ raise Exception('won\'t overwrite %s without -f' % outfn)
+ except OSError:
+ # file not found, yay
+ pass
+ outfile = open(outfn, 'w')
+
+ if outfile.isatty() and not want_decompress and not force:
+ raise Exception('won\'t compress to terminal without -f')
+
+ if want_decompress:
+ magic = infile.read(4)
+ if magic != MAGIC:
+ raise Exception('sorry; the file is not from this program')
+ if skip or length:
+ if skip is None or length is None:
+ raise Exception('--skip and --length go together')
+ index = read_index()
+ read_random(index,skip,length)
+ else:
+ chunk = read_chunk()
+ while chunk:
+ outfile.write(chunk)
+ chunk = read_chunk()
+ else:
+ if skip or length:
+ raise Exception('--skip and --length are decompress-only')
+ infile_to_outfile()
+
+ if outfn and not (pipe or keep):
+ os.unlink(infile.name)
+
+if __name__ == '__main__':
+ main()
+else:
+ raise Exception('not a module')
+
+
--
To view, visit https://gerrit.wikimedia.org/r/63139
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I06519466a4ffaf82371d50dea42e067b73e0ba6f
Gerrit-PatchSet: 2
Gerrit-Project: operations/dumps
Gerrit-Branch: ariel
Gerrit-Owner: Twotwotwo <[email protected]>
Gerrit-Reviewer: ArielGlenn <[email protected]>
Gerrit-Reviewer: Nemo bis <[email protected]>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits