@david, did you try out and refine the example that @robert mentioned in his last email?
I have a similar use-case and wanted to get some feedback from you before trying it out myself. On Tue, 13 Nov 2018 at 16:15, Robert Bradshaw <[email protected]> wrote: > On Tue, Nov 13, 2018 at 3:44 PM David Gasquez <[email protected]> > wrote: > > > > Thanks so much for the help here Robert. If I understood it correctly, > to work with an unbounded source in Python right now the best approach is > to fake it using an initial create, state and timers. > > Yep, exactly. > > > If that's the case, do you have any code samples I can learn from? Not > sure how that looks to be honest. > > I can't think of any examples offhand, but it's look something like > > > class OplogSourceDoFn(beam.DoFn): > > resume_token_spec = beam.DoFn.BagStateSpec('resume', > beam.coders.BytesCoder()) > timer_spec = beam.DoFn.TimerSpec('continue', TimeDomain.EVENT_TIME) > > def __init__(self, uri, database, collection, batch_size=100): > super(OplogSourceDoFn, self).__init__() > self.uri = uri > self.database = database > self.collection = collection > self.batch_size = batch_size > > def client(self): > self._client = pymongo.MongoClient(self.uri, > readPreference="secondaryPreferred") > return self._client > > def process(self, element, timer=DoFn.TimerParam(timer_spec), > now=DoFn.TimestampParam): > # Set a timer to get things going. This will fire soon. > timer.set(beam.Timestamp.of(time.time())) > > @on_timer(timer_spec) > def resume(self, timer=DoFn.TimerParam(timer_spec), > resume_tokens=DoFn.StateParam(resume_token_spec)): > > # Get the latest mongodb resume token, if any. > last_token = any(resume_tokens.read()) or None > > # Open the stream. > client = self.client() > self.db = client.get_database(self.database) > self.col = self.db.get_collection(self.collection) > self.cursor = > self.col.watch(full_document="updateLookup", resume_after=last_token) > > # Read at most batch_size elements from the stream. > change = None > with self.cursor as stream: > for _, change in zip(range(self.batch_size, stream)): > yield change > > # If anything was read, set the last one as our resume token. > if change: > resume_tokens.clear() > resume_tokens.add(change.get('_id')) > > # Schedule this method to run again. > timer.set(beam.Timestamp.of(time.time())) > > > Warning, this is email-window authored, completely untested code, so YYMV. > But hopefully it serves as an example of the core concepts. > > > Finally, I'm thinking it might be simpler for me to use the provided > PubSub streaming source. That'd mean using an external service to place the > events in PubSub. It should also take care of checkpointing and handling > errors! > > If you have somewhere you can run a process that reads from mongo and > publishes to pubsub, that'd do too. > > > On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw <[email protected]> > wrote: > >> > >> Just an addendum, you should be able to fake this in the meantime by > >> starting with an initial create and using state and timers. One > >> problem with the source as written above is that it will never > >> checkpoint, meaning if your pipeline crashes it will start again from > >> the beginning (including all the downstream processing). You could > >> possibly get a resume token from your cursor, store that to state, and > >> exit the DoFn. In your timer callback, you would resume reading for a > >> while and then set another timer, just as before. See > >> https://s.apache.org/beam-python-user-state-and-timers and related > >> docs for all the details. > >> > >> Don't hesitate to respond to the thread if anything isn't clear or you > >> have additional questions (or success stories!). > >> > >> - Robert > >> > >> On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw <[email protected]> > wrote: > >> > > >> > The future of Beam sources is SDF, see > >> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html > >> > > >> > This is actively being worked on, but we're still in the present. For > >> > bounded sources, you still may want to use the Source API (which, in > >> > Python, is much closer to what SDF is settling down to be, so it > >> > should be an easy port once that time comes). Unfortunately, Python > >> > Streaming doesn't yet support anything but PubSub. > >> > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez < > [email protected]> wrote: > >> > > > >> > > Hey there, > >> > > > >> > > I've been exploring Apache Beam lately and I'm now working on my > first production pipeline. The goal of this pipeline is to replicate a > MongoDB Collection into Big Query. To do that I want to read MongoDB Oplog > and use these events to update the table in Big Query (happy to expand more > on this if needed). > >> > > > >> > > MongoDB Oplog is an unbounded source. I was wondering what are the > best practices dealing with this kind of sources in Python. Currently, I'm > using a custom beam.DoFn to read the Oplog inside a streaming pipeline. > That said, I'm not sure how this will behave and how can be improved (the > pipeline relies on a beam.Create([0]) first step that seems hacky to me). > >> > > > >> > > This are the key snippets of the code: > >> > > > >> > > ``` > >> > > class OplogSourceDoFn(beam.DoFn): > >> > > def __init__(self, uri, database, collection): > >> > > super(OplogSourceDoFn, self).__init__() > >> > > self.uri = uri > >> > > self.database = database > >> > > self.collection = collection > >> > > > >> > > def client(self): > >> > > self._client = pymongo.MongoClient(self.uri, > readPreference="secondaryPreferred") > >> > > return self._client > >> > > > >> > > def process(self, element): > >> > > client = self.client() > >> > > self.db = client.get_database(self.database) > >> > > self.col = self.db.get_collection(self.collection) > >> > > self.cursor = self.col.watch(full_document="updateLookup") > >> > > > >> > > with self.cursor as stream: > >> > > for change in stream: > >> > > yield change > >> > > > >> > > pipeline = ( > >> > > p > >> > > | 'dummy_create' >> beam.Create([0]) > >> > > | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, > COLLECTION)) > >> > > | 'process' >> beam.Map(process) > >> > > ) > >> > > ``` > >> > > > >> > > My hunch is that there's a way to leverage the StreamingCreate > PTransform to read MongoDB Oplog or any other external unbounded source. > Alternatively, I've also seen a good example on how to create a > BoundedSource. This might be similar for an unbounded one but I think the > Beam Programming Guide discourages building sources using the Source API. > >> > > > >> > > I'd appreciate any input or feedback you might have about the code > and approach I'm taking! > >> > > > >> > > Thanks, > >> > > David. > -- Pascal Gula Senior Data Engineer / Scientist +49 (0)176 34232684www.plantix.net <http://plantix.net/> PEAT GmbH Kastanienallee 4 10435 Berlin // Germany <https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>
