Dear Tzu-Li,

Thank you so much for your prompt response.

Lets assume I have a variable, in Java, env which is my
StreamExecutionEnvironment. When I go ahead and attempt to do:

> ​env.addSource();

​It requests an implementation of a Source Function interface:

> env.addSource(new SourceFunction<Document>() {

> ​​
> @Override

            public void run(SourceFunction.SourceContext<Document> ctx)
> throws Exception {

> ​// TO DO​

>             @Override

            public void cancel() {

> ​// TO DO​


​And this is where I'm somehow stuck. I do not understand how should I
access my MongoDB's cursor in any of this methods (I suppose the most
adequate would be the "run" method) in a way it would allow me to return a
new MongoDB document as it arrived to the database from another source.

Once again, thank you so much for your help.

I will wait to hear from you!​


*Pedro Lima Monteiro*

On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <>

> Hi Pedro!
> This is definitely possible, by simply writing a Flink `SourceFunction`
> that uses MongoDB clients to fetch the data.
> It should be straightforward and works well with MongoDB’s cursor APIs.
> Could you explain a bit which part in particular you were stuck with?
> Cheers,
> Gordon
> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
> wrote:
> Good morning,
> I am trying to get data from MongoDB to be analysed in Flink.
> I would like to know if it is possible to stream data from MongoDB into
> Flink. I have looked into Flink's source function to add in the addSource
> method of the StreamExecutionEnvironment but I had no luck.
> Can anyone help me out?
> Thanks.
> *Pedro Lima Monteiro*

Reply via email to