Thanks so much for the help here Robert. If I understood it correctly, to work with an unbounded source in Python right now the best approach is to fake it using an *initial create, state and timers*. If that's the case, do you have any code samples I can learn from? Not sure how that looks to be honest.
Finally, I'm thinking it might be simpler for me to use the provided PubSub streaming source. That'd mean using an external service to place the events in PubSub. It should also take care of checkpointing and handling errors! On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw <[email protected]> wrote: > Just an addendum, you should be able to fake this in the meantime by > starting with an initial create and using state and timers. One > problem with the source as written above is that it will never > checkpoint, meaning if your pipeline crashes it will start again from > the beginning (including all the downstream processing). You could > possibly get a resume token from your cursor, store that to state, and > exit the DoFn. In your timer callback, you would resume reading for a > while and then set another timer, just as before. See > https://s.apache.org/beam-python-user-state-and-timers and related > docs for all the details. > > Don't hesitate to respond to the thread if anything isn't clear or you > have additional questions (or success stories!). > > - Robert > > On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw <[email protected]> > wrote: > > > > The future of Beam sources is SDF, see > > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > > > > This is actively being worked on, but we're still in the present. For > > bounded sources, you still may want to use the Source API (which, in > > Python, is much closer to what SDF is settling down to be, so it > > should be an easy port once that time comes). Unfortunately, Python > > Streaming doesn't yet support anything but PubSub. > > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez <[email protected]> > wrote: > > > > > > Hey there, > > > > > > I've been exploring Apache Beam lately and I'm now working on my first > production pipeline. The goal of this pipeline is to replicate a MongoDB > Collection into Big Query. To do that I want to read MongoDB Oplog and use > these events to update the table in Big Query (happy to expand more on this > if needed). > > > > > > MongoDB Oplog is an unbounded source. I was wondering what are the > best practices dealing with this kind of sources in Python. Currently, I'm > using a custom beam.DoFn to read the Oplog inside a streaming pipeline. > That said, I'm not sure how this will behave and how can be improved (the > pipeline relies on a beam.Create([0]) first step that seems hacky to me). > > > > > > This are the key snippets of the code: > > > > > > ``` > > > class OplogSourceDoFn(beam.DoFn): > > > def __init__(self, uri, database, collection): > > > super(OplogSourceDoFn, self).__init__() > > > self.uri = uri > > > self.database = database > > > self.collection = collection > > > > > > def client(self): > > > self._client = pymongo.MongoClient(self.uri, > readPreference="secondaryPreferred") > > > return self._client > > > > > > def process(self, element): > > > client = self.client() > > > self.db = client.get_database(self.database) > > > self.col = self.db.get_collection(self.collection) > > > self.cursor = self.col.watch(full_document="updateLookup") > > > > > > with self.cursor as stream: > > > for change in stream: > > > yield change > > > > > > pipeline = ( > > > p > > > | 'dummy_create' >> beam.Create([0]) > > > | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, > COLLECTION)) > > > | 'process' >> beam.Map(process) > > > ) > > > ``` > > > > > > My hunch is that there's a way to leverage the StreamingCreate > PTransform to read MongoDB Oplog or any other external unbounded source. > Alternatively, I've also seen a good example on how to create a > BoundedSource. This might be similar for an unbounded one but I think the > Beam Programming Guide discourages building sources using the Source API. > > > > > > I'd appreciate any input or feedback you might have about the code and > approach I'm taking! > > > > > > Thanks, > > > David. >
