Hi Henry, You're right, this is not very well documented at all, maybe we can use this discussion to open up some JIRA tickets to document this better (both in javadocs and on the wiki).
On Fri, Jul 26, 2013 at 12:46 AM, Henry Lee <[email protected]> wrote: > I like to build an app where I build an LDA model offline periodically by > Amazon EMR/Hadoop, and I make a document/topic inference for a new document > online. > > I read a post/a reply about using LDA CVB model to match a new doc to > topics > (http://tinyurl.com/mef4czr) > > I have some questions about using LDA CVB TopicModel class which isn't well > documented: > > Q: how many iterations are good? and why? > This will depend heavily on your number of topics and vocabulary size, as well as the size of your document. BUT I've found that I've never seen much gains after 40-50 iterations, and most of the time there's not much after 20 (I typically default to somewhere between 20 and 30 independent of the document size). The way you *really* want to be sure is to verify that the p(doc | topic_distribution(doc)) has plateaued (equivalently, that the perplexity has plateaued). The way to check this is to by periodically (maybe every 5 iterations) call model.perplexity(doc, docTopics), and verify that the *relative* decrease in that is always greater than epsilon, for some epsilon. But of course, now instead of an arbitrary maxIters (20-30), you have an arbitrary epsilon (0.001, say). So if you're really still having trouble with the arbitrariness of this decision, you can verify that the *ordering* of the topics (by probability) for the document has stabilized. This will probably happen before perplexity has plateaued, and before complete convergence (as the probabilities will still be changing, but the relative sizes will have already set themselves up in an ordering). Q: do we get model mutated by training w/ new doc? why? > No, we do not. If you want to, this is easy to do, however: you can either update the current model you're using (which means you need to be careful about concurrency if you're running this as a singleton model in a webservice of some kind), or you can accumulate the model updates onto a new model. To see this in action, look at the ModelTrainer class in the same package, basically it works like this: after converging the docTopics vector, you take the SparseMatrix (docTopicModel) and just call model.update(docTopicModel). This is an asynchronous update which updates the model in a multithreaded way. I don't know how safe it is to be simultaneously calling model.train() and model.update() on the same model from different threads. The way this is done in ModelTrainer, is that all calls to ModelTrainer.train() are async, and queue up incoming documents into a work queue, which calls model.trainDocTopicModel() (a read-only operation) on many docs in parallel (from different threads), but the calls to model.update() are by their nature also queued and threaded (so no topic vector in the model is getting written to by more than one thread at a time). But even here, the question of how entirely safe/accurate it is to do these simultaneous many-reads and single-writes (per topic). In practice, the default operation in Mahout is to *not* read and write from the same model, while training, but to read from one model (which was averaged over all mapper outputs after the previous MR iteration) and write updates to a fresh empty model, which is what will be emitted at the end of the iteration. In theory, running this in "online learning mode" should work, and converge way faster (see e.g. Hoffman, Blei, and Bach: pdf<http://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf>), but warning: we're not doing what they do here to guarantee convergence, so in theory this may not converge at all. > Q: what is inferred in this program? how to use that infer() method? > Infer is a weirdly named method - it is basically the LDA version of what you do in SVD/LSI where you project onto your reduced dimensional space, then "re-embed" back in your original space. What it is exactly is that it calculates the posterior probability of all of the original terms in your document, given the model and the p(topic | doc) distribution which the model converged to for the given document. It's a vector over terms in your original vocabulary, restricted to only those in the original input document, where the value at term i is p(i) = sum_(topic x) (p(i|x) * p(x|doc)). It can be a way of seeing what the model thinks the most relevant terms in a document are. > and Q: Has anyone seen good example usages/sample code of doing this kind > of task? > Mahout clustering and classification really needs more of a service interface (collaborative filtering already has Taste), so that we can show off this kind of thing properly. I have code which does this at Twitter, and while we could open source it fairly easily, it doesn't really have "a place to live". > > Thanks, > Henry Lee. > See my code below: > > @Testpublic void testOfJakeMannixIdeaAndQuestions() { // > [email protected] > val conf = new Configuration(); > val dictionary = readDictionary(new Path("/tmp/dictionary.file-0"), > conf); > assertThat(dictionary.length, equalTo(41807)); > > // tfidf_vector represents a document in RandomAccessSparseVector. > val tfidf_vector = readTFVectorsInRange(new > Path("/tmp/tfidf-vectors"), conf, 0, 1)[0].getSecond(); > assertThat(tfidf_vector.size(), equalTo(41807)); > > // reads 'model' dense matrix (20 x 41K), and in 'topicSum' dense > vector. > TopicModel model = readModel(dictionary, new > Path("/tmp/reuters-lda-model-splits"), conf); > assertThat(model.getNumTopics(), equalTo(20)); > assertThat(model.getNumTerms(), equalTo(41807)); > > val doc = tfidf_vector; > Vector docTopics = new DenseVector(new > double[model.getNumTopics()]).assign(1.0/model.getNumTopics()); > Matrix docTopicModel = new SparseRowMatrix(model.getNumTopics(), > doc.size()); > > // Q: How many iterations are good? Why? > for (int i = 0; i < 100 /* maxItrs */; i++) { > model.trainDocTopicModel(doc, docTopics, docTopicModel); > System.out.println(docTopics.toString()); > // Q: Do you think that 'model' got mutated, or not? why? > } > > Vector inferred = model.infer(doc, docTopics); > System.out.println(inferred); // Q: What is this inferred? How can > I use it?} > @SneakyThrows({ IOException.class })private static Pair<String, > Vector>[] readTFVectorsInRange(Path path, Configuration conf, int > offset, int length) { > val seq = new SequenceFile.Reader(FileSystem.get(conf), path, conf); > val documentName = new Text(); > @SuppressWarnings("unchecked") > Pair<String, Vector>[] vectors = new Pair[length]; > VectorWritable vector = new VectorWritable(); > for (int i = 0; i < offset + length && seq.next(documentName, > vector); i++) { > if (i >= offset) { > vectors[i - offset] = Pair.of(documentName.toString(), > vector.get()); > } > } > return vectors;} > @SneakyThrows({ IOException.class })private static TopicModel > readModel(String[] dictionary, Path path, Configuration conf) { > double alpha = 0.0001; // default: doc-topic smoothing > double eta = 0.0001; // default: term-topic smoothing > double modelWeight = 1f; > return new TopicModel(conf, eta, alpha, dictionary, 1, > modelWeight, listModelPath(path, conf));} > @SneakyThrows({ IOException.class })private static Path[] > listModelPath(Path path, Configuration conf) { > if (FileSystem.get(conf).isFile(path)) { > return new Path[] { path }; > } else { > val statuses = FileSystem.get(conf).listStatus(path, > PathFilters.partFilter()); > val modelPaths = new Path[statuses.length]; > for (int i = 0; i < statuses.length; i++) { > modelPaths[i] = new > Path(statuses[i].getPath().toUri().toString()); > } > return modelPaths; > }} > @SneakyThrows({ IOException.class })private static String[] > readDictionary(Path path, Configuration conf) { > val term = new Text(); > val id = new IntWritable(); > val reader = new SequenceFile.Reader(FileSystem.get(conf), path, conf); > val termIds = ImmutableList.<Pair<String, Integer>>builder(); > int maxId = 0; > while (reader.next(term, id)) { > termIds.add(Pair.of(term.toString(), id.get())); > maxId = max(maxId, id.get()); > } > String[] terms = new String[maxId + 1]; > for (val termId : termIds.build()) { > terms[termId.getSecond().intValue()] = > termId.getFirst().toString(); > } > return terms;} > -- -jake
