Hi all, I got problems similar to those described by Erik Enge a few weeks ago with indexing a somewhat larger amount of text (~500 MB in ~194000 objects): Zope tends to "eat up" all available memory (640MB in my case) and swap space. The main problem seem to be that the classes Zcatalog, Catalog, UnTextIndex and [Globbing}Lexicon try to do all partial indexing tasks at once: The Catalog instance collects meta data for each indexed object; the UnTextIndex instance adds new words to the Lexicon instance, builds an entry for each indexed object in _unindex and - most important - updates _index "just in time". The lexicon contains more than 1 million entries for "my" data, and the _index entries for common words may contain a considerable number of references to indexed object. I am relatively new to Python and Zope, so I don't have any idea, how much memory is required for a reference to an indexed object in UnTextIndex._index. But assuming 10 bytes for each reference, and assuming that on average each indexed word/Lexicon entry occurs in 20 objects, the _index object will require around 200 MB, if it is kept completely in RAM. This can be easily avoided, if the process of updating UnTextIndex._index is separated from the other indexing stuff. With the attached patch, UnTextIndex.index_object writes the data for new _index entries as lines containing WordId, DocumentId, score, into a pipe which is connected to Good Old sort(1). The output from sort is read by the new UnTextIndex method updateWordBTree. This way, each entry of UnTextIndex._index needs to be updated only once, while the original implementation of UnTextIndex required an update for each occurence of a word in every indexed object. There are two obvious points, where the patch can be optimized. (1) the "perfectly sorted" list returned by sort(1) is obviously not very efficient for building the IIBTree indexRow in updateWordBTree, and (2) the type check for indexRow in updateWordBTree can of course be avoided, after the type has been set to IIBTree. (updateWordBTree is simply modified version of insertForwardIndexEntry -- not very clever but most easy to implement :) Abel PS: Watching Zope indexing the above mentioned data with subtransaction enabled, I wondered, where all the data from the finished subtransactions is stored. Data.fs is not updated, and I could not find any temporary files. (ok, I had not have yet the chance to look what's going on during, while updateWordBTree is running; during my tests, it was called in the middle of the night -- but while ZCatalog.ZopeFindAndApply is running, nothing seems to hallen)
--- Catalog.py.orig Thu Apr 19 19:07:29 2001 +++ Catalog.py Fri May 25 13:46:16 2001 @@ -105,6 +105,7 @@ from SearchIndex.randid import randid import time +import popen2 def orify(seq, query_map={ @@ -379,8 +380,24 @@ self.indexes = indexes # the cataloging API + + def openPipes(self): + pipes={} + for name in self.indexes.keys(): + index = self.indexes[name] + index = index.__of__(self) + if hasattr(index, 'updateWordBTree'): + pipes[name] = popen2.Popen3("sort", 1) + return pipes + + def updateWordBTrees(self, pipes, threshold=None): + for name in self.indexes.keys(): + index = self.indexes[name] + index = index.__of__(self) + if hasattr(index, 'updateWordBTree'): + index.updateWordBTree(pipes[name], threshold) - def catalogObject(self, object, uid, threshold=None): + def catalogObject(self, object, uid, threshold=None, pipes=None): """ Adds an object to the Catalog by iteratively applying it all indexes. @@ -439,17 +456,34 @@ self.paths[index] = uid total = 0 - for x in self.indexes.values(): - ## tricky! indexes need to acquire now, and because they - ## are in a standard dict __getattr__ isn't used, so - ## acquisition doesn't kick in, we must explicitly wrap! - x = x.__of__(self) - if hasattr(x, 'index_object'): - blah = x.index_object(index, object, threshold) - total = total + blah - else: - LOG('Catalog', ERROR, ('catalogObject was passed ' - 'bad index object %s.' % str(x))) + if pipes is None: + for x in self.indexes.values(): + ## tricky! indexes need to acquire now, and because they + ## are in a standard dict __getattr__ isn't used, so + ## acquisition doesn't kick in, we must explicitly wrap! + x = x.__of__(self) + if hasattr(x, 'index_object'): + blah = x.index_object(index, object, threshold) + total = total + blah + else: + LOG('Catalog', ERROR, ('catalogObject was passed ' + 'bad index object %s.' % str(x))) + else: + for name in self.indexes.keys(): + x = self.indexes[name] + ## tricky! indexes need to acquire now, and because they + ## are in a standard dict __getattr__ isn't used, so + ## acquisition doesn't kick in, we must explicitly wrap! + x = x.__of__(self) + if hasattr(x, 'index_object'): + if pipes.has_key(name): + blah = x.index_object(index, object, threshold, pipes[name]) + else: + blah = x.index_object(index, object, threshold) + total = total + blah + else: + LOG('Catalog', ERROR, ('catalogObject was passed ' + 'bad index object %s.' % str(x))) return total
--- UnTextIndex.py.orig Wed Mar 28 01:43:11 2001 +++ UnTextIndex.py Fri May 25 14:01:46 2001 @@ -305,7 +305,7 @@ # put our first entry in, and use a tuple to save space index[entry] = (documentId, score) - def index_object(self, documentId, obj, threshold=None): + def index_object(self, documentId, obj, threshold=None, pipe=None): """ Index an object: 'documentId' is the integer id of the document @@ -356,9 +356,14 @@ # Now index the words. Note that the new xIBTrees are clever # enough to do nothing when there isn't a change. Woo hoo. - insert=self.insertForwardIndexEntry - for wid, score in widScores.items(): - insert(wid, documentId, score) + if pipe is None: + insert=self.insertForwardIndexEntry + for wid, score in widScores.items(): + insert(wid, documentId, score) + else: + write = pipe.tochild.write + for wid, score in widScores.items(): + write(`wid` + " " + `documentId` + " " + `score` + "\n") # Save the unindexing info if it's changed: wids=widScores.keys() @@ -367,6 +372,80 @@ return len(wids) + def updateWordBTree(self, pipe, threshold=None): + pipe.tochild.close() + readline = pipe.fromchild.readline + print "waiting for pipe input" + + index = self._index + + s = readline() + lastentry = None + count = 0 + + while len(s) > 4: + (entry, documentId, score) = s.split(' ') + entry = int(entry) + documentId = int(documentId) + score = int(score) + + # copied from insertForwardIndexEntry: + if entry != lastentry: + indexRow = index.get(entry, None) + if threshold is not None: + count = count + 1 + if count >= threshold: + get_transaction().commit(1) + self._p_jar.cacheFullSweep(3) + print "commit indexer", s, + count = 0 + + if indexRow is not None: + if type(indexRow) is TupleType: + # Tuples are only used for rows which have only + # a single entry. Since we now need more, we'll + # promote it to a mapping object (dictionary). + + # First, make sure we're not already in it, if so + # update the score if necessary. + if indexRow[0] == documentId: + if indexRow[1] != score: + indexRow = (documentId, score) + index[entry] = indexRow + else: + indexRow={ + indexRow[0]: indexRow[1], + documentId: score, + } + index[entry] = indexRow + else: + # xxx optimization: put a "readline loop" + # into _this_ block!! + if indexRow.get(documentId, -1) != score: + # score changed (or new entry) + + if type(indexRow) is DictType: + indexRow[documentId] = score + if len(indexRow) > 3: + # Big enough to give it's own database record + indexRow=IIBTree(indexRow) + index[entry] = indexRow + else: + indexRow[documentId] = score + else: + # We don't have any information at this point, so we'll + # put our first entry in, and use a tuple to save space + indexRow = index[entry] = (documentId, score) + + lastentry = entry + s = readline() + + pipe.fromchild.close() + res = pipe.wait() + # xxx I'm new to Python and Zope... How should an error + # be handled? raise an exception? + return + def _subindex(self, source, wordScores, last, splitter): """Recursively handle multi-word synonyms""" for word in splitter(source):
--- ZCatalog.py.orig Wed Mar 21 23:48:04 2001 +++ ZCatalog.py Fri May 25 13:48:04 2001 @@ -359,6 +359,7 @@ obj = REQUEST.PARENTS[1] path = string.join(obj.getPhysicalPath(), '/') + pipelist = self._catalog.openPipes() results = self.ZopeFindAndApply(obj, obj_metatypes=obj_metatypes, @@ -372,8 +373,11 @@ search_sub=1, REQUEST=REQUEST, apply_func=self.catalog_object, - apply_path=path) + apply_path=path, + pipes=pipelist) + self._catalog.updateWordBTrees(pipelist, self.threshold) + elapse = time.time() - elapse c_elapse = time.clock() - c_elapse @@ -412,7 +416,7 @@ RESPONSE.redirect(URL1 + '/manage_catalogIndexes?manage_tabs_message=Index%20Deleted') - def catalog_object(self, obj, uid=None): + def catalog_object(self, obj, uid=None, pipes=None): """ wrapper around catalog """ if uid is None: @@ -426,13 +430,12 @@ elif type(uid) is not StringType: raise CatalogError('The object unique id must be a string.') - self._catalog.catalogObject(obj, uid, None) + self._catalog.catalogObject(obj, uid, None, pipes) # None passed in to catalogObject as third argument indicates # that we shouldn't try to commit subtransactions within any # indexing code. We throw away the result of the call to # catalogObject (which is a word count), because it's # worthless to us here. - if self.threshold is not None: # figure out whether or not to commit a subtransaction. t = id(get_transaction()) @@ -449,6 +452,7 @@ # to commit a subtransaction. The semantics here mean that # we should commit a subtransaction if our threshhold is # exceeded within the boundaries of the current transaction. + if self._v_total > self.threshold: get_transaction().commit(1) self._p_jar.cacheFullSweep(3) @@ -574,7 +578,7 @@ obj_permission=None, obj_roles=None, search_sub=0, REQUEST=None, result=None, pre='', - apply_func=None, apply_path=''): + apply_func=None, apply_path='', pipes=None): """Zope Find interface and apply This is a *great* hack. Zope find just doesn't do what we @@ -649,7 +653,7 @@ ) ): if apply_func: - apply_func(ob, (apply_path+'/'+p)) + apply_func(ob, (apply_path+'/'+p), pipes) else: add_result((p, ob)) dflag=0 @@ -661,7 +665,7 @@ obj_permission, obj_roles, search_sub, REQUEST, result, p, - apply_func, apply_path) + apply_func, apply_path, pipes) if dflag: ob._p_deactivate() return result