Thank you,

i will study that. it is a bit more raw i would say. The thing is my source
is Kafka. I will have to see how to combine all of that altogether in the
most elegant way possible. Will get back to you on this, after i scratch my
head enough.

Best,

Daniel

On Wed, May 25, 2016 at 11:02 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I see what you mean now. The Akka Streams API is very interesting, in how
> they allow async calls.
>
> For Flink, I think you could implement it as a custom source that listens
> for the change stream, starts futures to get data from the database and
> emits elements when the future completes. I quickly sketched such an
> approach:
>
>
> public static class MyDBSource implements ParallelSourceFunction<String> {
>     private static final long serialVersionUID = 1L;
>
>     private volatile boolean running = true;
>
>     @Override
>     public void run(final SourceContext<String> ctx) throws Exception {
>         ChangelogConnection log = new ChangelogConnection();
>         DB db = new DB();
>
>         final Object checkpointLock = ctx.getCheckpointLock();
>
>         while (running) {
>             // try and fetch next changelog item
>             Change change = log.getNextChange();
>
>             DB.fetch(change, new Future() {
>                 public void complete(String data) {
>                     synchronized (checkpointLock) {
>                         ctx.collect(data);
>                     }
>                 }
>             });
>         }
>     }
>
>     @Override
>     public void cancel() {
>         running = false;
>     }
> }
>
> I hope that helps.
>
> -Aljoscha
>
> On Wed, 25 May 2016 at 12:21 Maatary Okouya <maatarioko...@gmail.com>
> wrote:
>
>> Maybe the following can illustrate better what i mean
>> http://doc.akka.io/docs/akka/2.4.6/scala/stream/stream-integrations.html#Integrating_with_External_Services
>>
>> On Wed, May 25, 2016 at 5:16 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> there is no functionality to have asynchronous calls in user functions
>>> in Flink.
>>>
>>> The asynchronous action feature in Spark is also not meant for such
>>> things, it is targeted at programs that need to pull all data to the
>>> application master. In Flink this is not necessary because you can specify
>>> a whole plan of operations before executing them.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 24 May 2016 at 20:43 Maatary Okouya <maatarioko...@gmail.com>
>>> wrote:
>>>
>>>> I'm looking for a way to avoid thread starvation in my tasks, by
>>>> returning future but i don't see how is that possible.
>>>>
>>>> Hence i would like to know, how flink handle the case where in your job
>>>> you have to perform network calls (I use akka http or spray) or any IO
>>>> operation and use the result of it.
>>>>
>>>> In sparks i see asynchronous action and so on. I don't see any
>>>> equivalent in apache flink. How does it works ? is it supported, or the
>>>> network call and any io operation have to be synchronous ?
>>>>
>>>> any help, indication, reads and so on would be appreciated
>>>>
>>>
>>

Reply via email to