Thank you again for your prompt response.

I will give it a try and will come back to you.

*Pedro Lima Monteiro*

On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> I would recommend checking out the Flink RabbitMQ Source for examples:
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-rabbitmq/src/main/java/org/
> apache/flink/streaming/connectors/rabbitmq/RMQSource.java
>
> For your case, you should extend the `RichSourceFunction` which provides
> additional access to override the `open()` life cycle method.
> In that method, you instantiate your MongoDB client connection and  fetch
> the cursor. In the `run()` method, you should essentially have a while loop
> that polls the MongoDB cursor and emits the fetched documents using the
> `SourceContext`.
>
> If your also looking to implement a MongoDB source that works with Flink’s
> checkpointing for exactly-once, be sure to check out:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/state.html#stateful-source-functions
>
> Cheers,
> Gordon
>
> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
> pedro.mlmonte...@gmail.com) wrote:
>
> Dear Tzu-Li,
>
> Thank you so much for your prompt response.
>
> Lets assume I have a variable, in Java, env which is my
> StreamExecutionEnvironment. When I go ahead and attempt to do:
>
>> ​env.addSource();
>>
>
> ​It requests an implementation of a Source Function interface:
> ​
>
>> env.addSource(new SourceFunction<Document>() {
>
>
>> ​​
>> @Override
>
>             public void run(SourceFunction.SourceContext<Document> ctx)
>> throws Exception {
>
>
>> ​// TO DO​
>>
>             }
>
>
>>             @Override
>
>             public void cancel() {
>
>
>> ​// TO DO​
>>
>             }
>
>         });
>
> ​And this is where I'm somehow stuck. I do not understand how should I
> access my MongoDB's cursor in any of this methods (I suppose the most
> adequate would be the "run" method) in a way it would allow me to return a
> new MongoDB document as it arrived to the database from another source.
>
> Once again, thank you so much for your help.
>
> I will wait to hear from you!​
>
> Cumprimentos,
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Pedro!
>>
>> This is definitely possible, by simply writing a Flink `SourceFunction`
>> that uses MongoDB clients to fetch the data.
>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>
>> Could you explain a bit which part in particular you were stuck with?
>>
>> Cheers,
>> Gordon
>>
>>
>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>> pedro.mlmonte...@gmail.com) wrote:
>>
>> Good morning,
>>
>> I am trying to get data from MongoDB to be analysed in Flink.
>> I would like to know if it is possible to stream data from MongoDB into
>> Flink. I have looked into Flink's source function to add in the addSource
>> method of the StreamExecutionEnvironment but I had no luck.
>> Can anyone help me out?
>> Thanks.
>>
>> *Pedro Lima Monteiro*
>>
>>
>

Reply via email to