Hey there, I've been exploring Apache Beam lately and I'm now working on my first *production* pipeline. The goal of this pipeline is to replicate a MongoDB Collection into Big Query. To do that I want to read MongoDB Oplog <https://docs.mongodb.com/manual/core/replica-set-oplog/> and use these events to update the table in Big Query (happy to expand more on this if needed).
MongoDB Oplog is an unbounded source. I was wondering what are the best practices dealing with this kind of sources in Python. Currently, I'm using a custom beam.DoFn to read the *Oplog* inside a streaming pipeline. That said, I'm not sure how this will behave and how can be improved (the pipeline relies on a beam.Create([0]) first step that seems hacky to me <https://stackoverflow.com/questions/46564730/google-dataflow-read-from-spanner> ). This are the key snippets of the code: ``` class OplogSourceDoFn(beam.DoFn): def __init__(self, uri, database, collection): super(OplogSourceDoFn, self).__init__() self.uri = uri self.database = database self.collection = collection def client(self): self._client = pymongo.MongoClient(self.uri, readPreference="secondaryPreferred") return self._client def process(self, element): client = self.client() self.db = client.get_database(self.database) self.col = self.db.get_collection(self.collection) self.cursor = self.col.watch(full_document="updateLookup") with self.cursor as stream: for change in stream: yield change pipeline = ( p | 'dummy_create' >> beam.Create([0]) | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION)) | 'process' >> beam.Map(process) ) ``` My hunch is that there's a way to leverage the StreamingCreate PTransform <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py> to read MongoDB Oplog or any other external unbounded source. Alternatively, I've also seen a good example on how to create a BoundedSource <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L737>. This might be similar for an unbounded one but I think the Beam Programming Guide discourages building sources using the Source API <https://beam.apache.org/documentation/io/authoring-overview/#read-transforms> . I'd appreciate any input or feedback you might have about the code and approach I'm taking! Thanks, David.
