Hey there,

I've been exploring Apache Beam lately and I'm now working on my first
*production* pipeline. The goal of this pipeline is to replicate a MongoDB
Collection into Big Query. To do that I want to read MongoDB Oplog
<https://docs.mongodb.com/manual/core/replica-set-oplog/> and use these
events to update the table in Big Query (happy to expand more on this if
needed).

MongoDB Oplog is an unbounded source. I was wondering what are the best
practices dealing with this kind of sources in Python. Currently, I'm using
a custom beam.DoFn to read the *Oplog* inside a streaming pipeline. That
said, I'm not sure how this will behave and how can be improved (the
pipeline relies on a beam.Create([0]) first step that seems hacky to me
<https://stackoverflow.com/questions/46564730/google-dataflow-read-from-spanner>
).

This are the key snippets of the code:

```
class OplogSourceDoFn(beam.DoFn):
    def __init__(self, uri, database, collection):
        super(OplogSourceDoFn, self).__init__()
        self.uri = uri
        self.database = database
        self.collection = collection

    def client(self):
        self._client = pymongo.MongoClient(self.uri,
readPreference="secondaryPreferred")
        return self._client

    def process(self, element):
        client = self.client()
        self.db = client.get_database(self.database)
        self.col = self.db.get_collection(self.collection)
        self.cursor = self.col.watch(full_document="updateLookup")

        with self.cursor as stream:
            for change in stream:
                yield change

pipeline = (
    p
    | 'dummy_create' >> beam.Create([0])
    | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
    | 'process' >> beam.Map(process)
)
```

My hunch is that there's a way to leverage the StreamingCreate PTransform
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py>
to
read MongoDB Oplog or any other external unbounded source. Alternatively,
I've also seen a good example on how to create a BoundedSource
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L737>.
This might be similar for an unbounded one but I think the Beam Programming
Guide discourages building sources using the Source API
<https://beam.apache.org/documentation/io/authoring-overview/#read-transforms>
.

I'd appreciate any input or feedback you might have about the code and
approach I'm taking!

Thanks,
David.

Reply via email to