On Tue, Nov 13, 2018 at 3:44 PM David Gasquez <[email protected]>
wrote:
>
> Thanks so much for the help here Robert. If I understood it correctly, to
work with an unbounded source in Python right now the best approach is to
fake it using an initial create, state and timers.

Yep, exactly.

> If that's the case, do you have any code samples I can learn from? Not
sure how that looks to be honest.

I can't think of any examples offhand, but it's look something like


class OplogSourceDoFn(beam.DoFn):

    resume_token_spec = beam.DoFn.BagStateSpec('resume',
beam.coders.BytesCoder())
    timer_spec = beam.DoFn.TimerSpec('continue', TimeDomain.EVENT_TIME)

    def __init__(self, uri, database, collection, batch_size=100):
        super(OplogSourceDoFn, self).__init__()
        self.uri = uri
        self.database = database
        self.collection = collection
        self.batch_size = batch_size

    def client(self):
        self._client = pymongo.MongoClient(self.uri,
readPreference="secondaryPreferred")
        return self._client

    def process(self, element, timer=DoFn.TimerParam(timer_spec),
now=DoFn.TimestampParam):
        # Set a timer to get things going.  This will fire soon.
        timer.set(beam.Timestamp.of(time.time()))

    @on_timer(timer_spec)
    def resume(self, timer=DoFn.TimerParam(timer_spec),
resume_tokens=DoFn.StateParam(resume_token_spec)):

        # Get the latest mongodb resume token, if any.
        last_token = any(resume_tokens.read()) or None

        # Open the stream.
        client = self.client()
        self.db = client.get_database(self.database)
        self.col = self.db.get_collection(self.collection)
        self.cursor =
self.col.watch(full_document="updateLookup", resume_after=last_token)

        # Read at most batch_size elements from the stream.
        change = None
        with self.cursor as stream:
            for _, change in zip(range(self.batch_size, stream)):
                yield change

        # If anything was read, set the last one as our resume token.
        if change:
          resume_tokens.clear()
          resume_tokens.add(change.get('_id'))

        # Schedule this method to run again.
        timer.set(beam.Timestamp.of(time.time()))


Warning, this is email-window authored, completely untested code, so YYMV.
But hopefully it serves as an example of the core concepts.

> Finally, I'm thinking it might be simpler for me to use the provided
PubSub streaming source. That'd mean using an external service to place the
events in PubSub. It should also take care of checkpointing and handling
errors!

If you have somewhere you can run a process that reads from mongo and
publishes to pubsub, that'd do too.

> On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw <[email protected]> wrote:
>>
>> Just an addendum, you should be able to fake this in the meantime by
>> starting with an initial create and using state and timers. One
>> problem with the source as written above is that it will never
>> checkpoint, meaning if your pipeline crashes it will start again from
>> the beginning (including all the downstream processing). You could
>> possibly get a resume token from your cursor, store that to state, and
>> exit the DoFn. In your timer callback, you would resume reading for a
>> while and then set another timer, just as before. See
>> https://s.apache.org/beam-python-user-state-and-timers and related
>> docs for all the details.
>>
>> Don't hesitate to respond to the thread if anything isn't clear or you
>> have additional questions (or success stories!).
>>
>> - Robert
>>
>> On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw <[email protected]>
wrote:
>> >
>> > The future of Beam sources is SDF, see
>> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> >
>> > This is actively being worked on, but we're still in the present. For
>> > bounded sources, you still may want to use the Source API (which, in
>> > Python, is much closer to what SDF is settling down to be, so it
>> > should be an easy port once that time comes). Unfortunately, Python
>> > Streaming doesn't yet support anything but PubSub.
>> > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez <[email protected]>
wrote:
>> > >
>> > > Hey there,
>> > >
>> > > I've been exploring Apache Beam lately and I'm now working on my
first production pipeline. The goal of this pipeline is to replicate a
MongoDB Collection into Big Query. To do that I want to read MongoDB Oplog
and use these events to update the table in Big Query (happy to expand more
on this if needed).
>> > >
>> > > MongoDB Oplog is an unbounded source. I was wondering what are the
best practices dealing with this kind of sources in Python. Currently, I'm
using a custom beam.DoFn to read the Oplog inside a streaming pipeline.
That said, I'm not sure how this will behave and how can be improved (the
pipeline relies on a beam.Create([0]) first step that seems hacky to me).
>> > >
>> > > This are the key snippets of the code:
>> > >
>> > > ```
>> > > class OplogSourceDoFn(beam.DoFn):
>> > >     def __init__(self, uri, database, collection):
>> > >         super(OplogSourceDoFn, self).__init__()
>> > >         self.uri = uri
>> > >         self.database = database
>> > >         self.collection = collection
>> > >
>> > >     def client(self):
>> > >         self._client = pymongo.MongoClient(self.uri,
readPreference="secondaryPreferred")
>> > >         return self._client
>> > >
>> > >     def process(self, element):
>> > >         client = self.client()
>> > >         self.db = client.get_database(self.database)
>> > >         self.col = self.db.get_collection(self.collection)
>> > >         self.cursor = self.col.watch(full_document="updateLookup")
>> > >
>> > >         with self.cursor as stream:
>> > >             for change in stream:
>> > >                 yield change
>> > >
>> > > pipeline = (
>> > >     p
>> > >     | 'dummy_create' >> beam.Create([0])
>> > >     | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE,
COLLECTION))
>> > >     | 'process' >> beam.Map(process)
>> > > )
>> > > ```
>> > >
>> > > My hunch is that there's a way to leverage the StreamingCreate
PTransform to read MongoDB Oplog or any other external unbounded source.
Alternatively, I've also seen a good example on how to create a
BoundedSource. This might be similar for an unbounded one but I think the
Beam Programming Guide discourages building sources using the Source API.
>> > >
>> > > I'd appreciate any input or feedback you might have about the code
and approach I'm taking!
>> > >
>> > > Thanks,
>> > > David.

Reply via email to