Hi, We're evaluating Beam and trying to figure out if it meets our needs (suspect it does!), and, if so, how to best set up our code.
Many thanks in advance. *Basic scenario:* * Data (for machine learning/prediction) comes in. Data is a set of documents which are 100% independent. We want to apply some transformations to those items on a per-doc basis. - Many of the transformations are easily and obviously encapsulated in beam user code. - *However, we'd also like to enrich the data via queries to external databases. How do we do that efficiently *(largely in time, but also in compute resources)*?* *Additional constraints:* - We are developing on Google Cloud, if it matters. - Ideally we can achieve below in Python (versus Java), to ease porting existing code. *Some examples:* 1) *Key-value query.* Text comes in, and we'd like to do some pre-processing to the text, and then look up certain subsets of that text against an external database. Those found mappings need to be associated with the underlying text. E.g., imagine we're doing Named Entity Recognition and trying to augment with a large, fine-grained external gazetteer. "I went to the Los Angeles Dodgers game." (RIP) Perhaps we generate ngrams ("I", ..., "I went", "went to", ..., "I went to", ..., "Los Angeles Dodgers", ...) and then find that "Los Angeles Dodgers" maps to entity 123456, and "Los Angeles" maps to 234567, and we want to map those back into the underlying document. 2) *More sophisticated query.* We do a bunch of calculations on the data, and then derive some intermediary result, and need to look that result up against an external database to generate a final result for the data. These queries might require a bit more SQL sophistication (table joining, looking up multiple rows and filtering, etc.). * Scenario #1 is more important than #2, because, worst case, we can probably cast most of our external enrichment to a key-value paradigm. *The concern: the database query becomes the choke point* * Most naive implementation would seem to be write user code that grabs each doc and does a remote database lookup for that doc. We initially had this implemented (outside of Beam), but found that (unsurprisingly) *the round-trip to the database became a blocker*--code would just be waiting on the DB round-trip and so processing slowed down dramatically (from keeping the db local via, ultimately unmanagable, a local SQLlite instance). Our solution was to 1) implement multi-threading (to limit the db queries blocking) and 2) implement local caching of lookups (using https://dogpilecache.readthedocs.io/en/latest/usage.html). Both of these did dramatically sped things up for the single-machine (non-Beam) scenario. *Is there an equivalent (direct code or design pattern) of either #1 or #2 in Beam? *(The answer to #1 might just be that Beam automatically adds more documents to be processed when it realizes things are slower than they "should be"?) *Possible solution?: pull the table(s), in full, down to the Beam cluster* * The tables we're working with aren't terribly large by modern standards (although I'd like to design for this potentially changing)--maybe a few GB at most, and probably easily shaved down to 100s of MBs. Conceptually--if quicker--we could (I think?) do something like pull the entire table down in a PCollection and then use that data "locally" (i.e., within the Beam cluster). - Presumably, for key-value lookup, we could write some query to efficiently cross-reference the two PCollection's (i.e., the db and the actual source data). (...although I haven't investigated deeply into Beam to confirm this is realistically doable?) - For more complex queries, it is less clear to me how to do the above, because we'd basically need to be able to run SQL queries on the Beam cluster. (Possibly via https://beam.apache.org/documentation/dsls/sql/? Although I'm not clear how solid this is or if it is available on Cloud Dataflow.) *If pulling the table down into the cluster is the correct solution is there a recommended way to do so?* Would it be via JDBC or a Redis connector (for k:v) (https://beam.apache.org/documentation/io/built-in/), perhaps? (Although, per top, pure Python would be preferable.)