Specifically, I hacked together a Lambda Streaming CCO with Spark and Flink for a demo for my upcoming FlinkForward talk. Will post code once I finish it / strip all my creds out. In short- the lack of serialization in Mahout incore vectors/matrices makes handing off / dealing with them somewhat tedious.
Trevor Grant Data Scientist https://github.com/rawkintrevo http://stackexchange.com/users/3002022/rawkintrevo http://trevorgrant.org *"Fortunate is he, who is able to know the causes of things." -Virgil* On Sun, Apr 9, 2017 at 5:39 PM, Andrew Palumbo <ap....@outlook.com> wrote: > Pat- > > What can we do from the mahout side? Would we need any new data > structures? Trevor and I were just discussing some of the troubles of > near real time matrix streaming. > ------------------------------ > *From:* Pat Ferrel <p...@occamsmachete.com> > *Sent:* Monday, March 27, 2017 2:42:55 PM > *To:* Ted Dunning; user@mahout.apache.org > *Cc:* Trevor Grant; Ted Dunning; s...@apache.org > *Subject:* Re: Lambda and Kappa CCO > > Agreed. Downsampling was ignored in several places and with it a great > deal of input is a noop. Without downsampling too many things need to > change. > > Also everything is dependent on this rather vague sentence. “- determine > if the new interaction element cross-occurs with A and if so calculate the > llr score”, which needs a lot more explanation. Whether to use Mahout > in-memory objects or reimplement some in high speed data structures is a > big question. > > The good thing I noticed in writing this is that model update and real > time can be arbitrarily far apart, that the system degrades gracefully. So > during high load it may fall behind but as long as user behavior is > up-to-date and persisted (it will be) we are still in pretty good shape. > > > On Mar 26, 2017, at 6:26 PM, Ted Dunning <tdunn...@mapr.com> wrote: > > > I think that this analysis omits the fact that one user interaction causes > many cooccurrences to change. > > This becomes feasible if you include the effect of down-sampling, but that > has to be in the algorithm. > > > From: Pat Ferrel <p...@occamsmachete.com> > Sent: Saturday, March 25, 2017 12:01:00 PM > To: Trevor Grant; user@mahout.apache.org > Cc: Ted Dunning; s...@apache.org > Subject: Lambda and Kappa CCO > > This is an overview and proposal for turning the multi-modal Correlated > Cross-Occurrence (CCO) recommender from Lambda-style into an online > streaming incrementally updated Kappa-style learner. > > # The CCO Recommender: Lambda-style > > We have largely solved the problems of calculating the multi-modal > Correlated Cross-Occurrence models and serving recommendations in real time > from real time user behavior. The model sits in Lucene (Elasticsearch or > Solr) in a scalable way and the typical query to produce personalized > recommendations comes from real time user behavior completes with 25ms > latency. > > # CCO Algorithm > > A = rows are users, columns are items they have “converted” on (purchase, > read, watch). A represents the conversion event—the interaction that you > want to recommend. > B = rows are users columns are items that the user has shown some > preference for but not necessarily the same items as A. B represent a > different interaction than A. B might be a preference for some category, > brand, genre, or just a detailed item page view—or all of these in B, C, D, > etc > h_a = a particular user’s history of A type interactions, a vector of > items that our user converted on. > h_b = a particular user’s history of B type interactions, a vector of > items that our user had B type interactions with. > > CCO says: > > [A’A]h_a + [A’B]h_b + [A’C]h_c = r; where r is the weighted items from A > that represent personalized recommendations for our particular user. > > The innovation here is that A, B, C, … represent multi-modal data. > Interactions of all types and on item-sets of arbitrary types. In other > words we can look at virtually any action or possible indicator of user > preference or taste. We strengthen the above raw cross-occurrence and > cooccurrence formula by performing: > > [llr(A’A)]h_a + [llr(A’B)]h_b + … = r adding llr (log-likelihood ratio) > correlation scoring to filter out coincidental cross-occurrences. > > The model becomes [llr(A’A)], [llr(A’B)], … each has items from A in rows > and items from A, B, … in columns. This sits in Lucene as one document per > items in A with a field for each of A, B, C items whose user interactions > most strongly correlate to the conversion event on the row item. Put > another way, the model is items from A. B, C… what have the most similar > user interaction from users. > > To calculate r we need to find the most simllar items in the model to the > history or behavior of our example user. Since Lucene is basically a > K-Nearest Neighbors engine that is particularly well tuned to work with > sparse data (our model is typically quite sparse) all we need to do is > segment the user history into h_a, h_b … and use it as the multi-field > query on the model. This performs the equivalent of: > > [llr(A’A)]h_a + [llr(A’B)]h_b + … = r where we substitute cosine > similarity of h_a to every row in [llr(A’A)]h_a for the tensor math. > Further Lucene sorts by score and returns only the top ranking items. Even > further we note that since we have performed a multi-field query it does > the entire multi-field similarity calculation and vector segment addition > before doing the sort. Lucene does this a a very performant manner so the > entire query, including fetching user history, forming the Lucene query and > executing it will take something like 25 ms and is indefinitely scalable to > any number of simultaneous queries. > > Problem solved? > > Well, yes and no. The above method I’ve label a Lambda-style recommender. > It uses real time user history and makes recommendations in real time but > it can only recommend items in A. So if A is changing rapidly, as when the > items have short lifetimes like newsy items of social media things like > tweets then A can get out of date in hours or minutes. The other downside > of Lambda CCO is that we note that the entirety of the data in A, B, C … > has to be re-examined every time new models are calculated. With data on > the order of a terabyte or more, this is quite a cost in compute resources. > > It is true that most typical A items will not change often. Think of the > typical E-Commerce case where A represents items in a catalog, which change > only infrequently. But the resources required to re-calculate the model > remain high, even if they are only needed once per week. We have seen the > existing Lambda-style system take many AWS x1.32xlarge instance hours to > recalculate the model. This translates into a significant cost for model > calculation alone. > > # Streaming Online CCO Kappa-style > > Since we have a good solution for serving query results from the model we > might solve the Lambda CCO issues by converting the model re-calc into a > streaming online incrementally updated process. In other words instead of > replacing the model with a new one periodically let’s see if we can update > it with each new interaction in real time. > > Stripping away the query part of the CCO algorithm the question is can we > update: > > [llr(A’A)], [llr(A’B)], … > > in real time. > > Caveats: updates may be micro-batched, not with each new event, and real > time really means “near real time” and the update frequency will be allowed > to vary with input frequency. > > We note a few things about the problem: > > 1) A, B, … are sparse matrices keyed by a user-id. This is possible to > model as a hashmap in memory or as a persistent distributed indexed dataset > in some fast NoSQL DB. > 2) if we need to update the model we also need to update the data that > produces the model, A, B, C in a persistent way. > 3) a single interaction will only affect one segment of the model (A’A or > A’B but not both) and one input matrix. > 4) The log-likelihood ratio needs to know 1) if there is a > cross-occurrence and 2) the counts of interactions from several > perspectives that correspond to row, column, and total density of the input > matrices. > 5) because of #4, the important data can be stored and updated as single > elements of vectors, not requiring examining the entirety of input. > > We can make several decisions now based on system requirements. > > 1) How important is it to persist A, B, C etc. If we save interaction logs > we can always re-create them when a system is taken down and brought back > up. This implies that in-memory hashmaps of sparse vectors are sufficient > to the solution even though a “reboot” may require some time to perform. If > the reboot time is critical we can represent A, B, ... in a mutable DB. > 2) The serving layer persists A’A, … and is highly performant and scalable > with one of the distributed versions of Lucene so we leave that in place. > The caveat is that we must now add new load to Lucene in that we must > update the model in place rather than recalculating it en masse > periodically. This has the benefit of also persisting the model so be > “reboot” resilient. > 3) if we are persisting user interaction h_a, h_b, persistently in a DB > then we have the making of queries. > > This leaves only the collected interactions in A, B, … that must be > accessed and updated in real time but if the were to disappear all we would > loose is model real time model updating, the queries would still work and > the system as a whole would continue to operate with very little notice of > the lose. Therefor using in-memory data structures (which give the ultimate > in speed) should be sufficient to the Kappa-style solution. They will be in > separate hashmaps with a sparse vector per key, remembering that the key is > an item from A and the vector is items from A, B, C ... > > # Kappa algorithm > > Transforming Lambda to Kappa requires: > > - receiving a new interaction, store it to a persistent DB as per the > Lambda implementation that we know is fast > - insert it into the hashmap that should contain it based on the > interaction type (A, B, …) > - recalculate the element in the row and column non-zero element vectors > and update them in-memory. These are used in llr and can be updated without > re-counting matrix elements > - determine if the new interaction element cross-occurs with A and if so > calculate the llr score > - fetch the model field for this cross-occurrence and add the new > interaction item if it scores high enough to meet the same threshold as > used in Lambda-style—several options are available here > > Using the existing performant infrastructure of the Lambda recommender for > persistence we have reduced the CCO model updates to in-memory hashmaps and > vectors and potentially one new update of a single doc field in Lucene. > Potentially being an important modifier because vey very often and update > to the persistent model will not be required. > > At any time we can recreate the live system by rebooting, but even if the > model update mechanism stops, queries continue to be served with real time > data. Disregarding DB or Lucene failure the system reverts from Kappa to > Lambda gracefully and then back to Kappa with a restart of the system > managing in-memory data. > > Many diagrams and illustrations showing actual component choices to be > inserted. Benchmarks for update timing also to be added. > > > > Any thoughts? The Kappa-style implementation is not started so suggestions > are welcome. The Lambda-style version is operational as The Universal > Recommender built on PredictionIO, HBase, Elasticsearch, Spark, and Mahout. > It is documented here: http://actionml.com/docs/ur < > http://actionml.com/docs/ur>and here: http://mahout.apache.org/ > users/algorithms/intro-cooccurrence-spark.html <http://mahout.apache.org/ > users/algorithms/intro-cooccurrence-spark.html> >