The future of Beam sources is SDF, see
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

This is actively being worked on, but we're still in the present. For
bounded sources, you still may want to use the Source API (which, in
Python, is much closer to what SDF is settling down to be, so it
should be an easy port once that time comes). Unfortunately, Python
Streaming doesn't yet support anything but PubSub.
On Tue, Nov 13, 2018 at 12:24 PM David Gasquez <[email protected]> wrote:
>
> Hey there,
>
> I've been exploring Apache Beam lately and I'm now working on my first 
> production pipeline. The goal of this pipeline is to replicate a MongoDB 
> Collection into Big Query. To do that I want to read MongoDB Oplog and use 
> these events to update the table in Big Query (happy to expand more on this 
> if needed).
>
> MongoDB Oplog is an unbounded source. I was wondering what are the best 
> practices dealing with this kind of sources in Python. Currently, I'm using a 
> custom beam.DoFn to read the Oplog inside a streaming pipeline. That said, 
> I'm not sure how this will behave and how can be improved (the pipeline 
> relies on a beam.Create([0]) first step that seems hacky to me).
>
> This are the key snippets of the code:
>
> ```
> class OplogSourceDoFn(beam.DoFn):
>     def __init__(self, uri, database, collection):
>         super(OplogSourceDoFn, self).__init__()
>         self.uri = uri
>         self.database = database
>         self.collection = collection
>
>     def client(self):
>         self._client = pymongo.MongoClient(self.uri, 
> readPreference="secondaryPreferred")
>         return self._client
>
>     def process(self, element):
>         client = self.client()
>         self.db = client.get_database(self.database)
>         self.col = self.db.get_collection(self.collection)
>         self.cursor = self.col.watch(full_document="updateLookup")
>
>         with self.cursor as stream:
>             for change in stream:
>                 yield change
>
> pipeline = (
>     p
>     | 'dummy_create' >> beam.Create([0])
>     | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
>     | 'process' >> beam.Map(process)
> )
> ```
>
> My hunch is that there's a way to leverage the StreamingCreate PTransform to 
> read MongoDB Oplog or any other external unbounded source. Alternatively, 
> I've also seen a good example on how to create a BoundedSource. This might be 
> similar for an unbounded one but I think the Beam Programming Guide 
> discourages building sources using the Source API.
>
> I'd appreciate any input or feedback you might have about the code and 
> approach I'm taking!
>
> Thanks,
> David.

Reply via email to