Re: asyhcnrouonous io question

2021-10-07 Thread Arvid Heise
>
> 1 . If the future completes exceptionally, ie resultFuture
> .completeExceptionally(throwable);
>
> does the input message get discarded?
>
If you add an exception that will bubble up and let the task fail.
Fail-over then determines what happens but in most cases the job will
restart to an earlier checkpoint. If the source rewinds to a respective
offset, Flink will replay all messages from the checkpoint onward and
resubmit them with async IO.

That means that you will not lose any input message but you receive
duplicate requests (everything between checkpoint and failure).


2. Should the request be made on a dedicated ExecutorService or is the
> forkpoolcommon sufficient?
>
Use a dedicated ExecutorService to exert more control over the thread pool.
Usually, you want to have as many threads as you have set the queue
capacity. Also note that if you run asyncIO in parallel, each task should
have its own queue as several tasks may reside in the same Java process.
With the common pool, you would only have as many threads as there are
cores, so you end up with 1 thread per task instance in the usual parallel
execution of asyncIO. It gets even worse if you have several asyncIOs in
your pipeline.


3. If the rest api service for example returns 404, should you complete
> with an empty collection or can you omit line 32 entirely?
>
You should always call some complete*, so Flink knows that the element has
been processed and can continue processing.

On Tue, Oct 5, 2021 at 10:06 PM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Tom,
>
> On Mon, Oct 4, 2021 at 10:42 PM tom yang  wrote:
>
>> Hello,
>>
>>
>>
>> I have a recently ran into an issue with RichAsyncFunction and wanted to
>> get some guidance from the community
>>
>>
>>
>> Please see snippet
>>
>>
>>
>> *class* AsyncFetchFromHttp *extends* RichAsyncFunction> String, String>> {
>>
>> 2
>>
>> 3*private* *transient* AysncHttpClient client;
>>
>> 4
>>
>> 5@Override
>>
>> 6*public* *void* *open*(Configuration parameters) *throws* Exception
>> {
>>
>> 7client = *new* AysncHttpClient();
>>
>> 8}
>>
>> 9
>>
>> 10@Override
>>
>> 11*public* *void* close() *throws* Exception {
>>
>> 12client.close();
>>
>> 13}
>>
>> 14
>>
>> 15@Override
>>
>> 16*public* *void* asyncInvoke(String key, *final* ResultFuture> > resultFuture) *throws* Exception {
>>
>> 17
>>
>> 18*// issue the asynchronous request, receive a future for
>> result*
>>
>> 19CompleteableFuture> future = httpClient
>> .sendAsync(request, HttpResponse.BodyHandlers.ofString())
>>
>> 20
>>
>> 21future.whenCompleteAsync((response, throwable) -> {
>>
>> 22  *if* (throwable != *null* ) {
>>
>> 23
>>
>> 24  resultFuture.completeExceptionally(throwable);
>>
>> 25  }
>>
>> 26  *else* {
>>
>> 27*if* (resp.statusCode() == HttpStatus.SC_OK) {
>>
>> 28  resultFuture.complete(Collections.singleton(*new* Tuple2
>> <>(key, response.body())
>>
>> 29}
>>
>> 30*else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND)
>> {
>>
>> 32  resultFuture.complete(Collections.emptyList())
>>
>> 33}
>>
>> 34*else* {
>>
>> 35   resultFuture.completeExceptionally(*new*
>> RuntimeException("Server processing error"));
>>
>> 36}
>>
>> 37  }
>>
>> 38
>>
>> 39})
>>
>> 40
>>
>> 41
>>
>> 42}
>>
>> 43}
>>
>>
>>
>> 1 . If the future completes exceptionally, ie resultFuture
>> .completeExceptionally(throwable);
>>
>> does the input message get discarded?
>>
>
> Which input do you mean here, "request"? It is not defined in your
> snippet, did it get lost when trimming unimportant parts?
> By default, you will get only the contained throwable, you would have to
> enrich it with the input if you want to retain it.
>
> 2. Should the request be made on a dedicated ExecutorService or is the
>> forkpoolcommon sufficient?
>>
>
> I don't see a good reason in general here to use a separate thread pool
> for the requests. They are async (not blocking), are part of your Flink job
> and run on your Taskmanagers. Unless there is something special in your
> setup that makes you suspect they block other tasks...
>
>
>> 3. If the rest api service for example returns 404, should you complete
>> with an empty collection or can you omit line 32 entirely?
>>
>
> This depends on your desired behavior: Do you want it to complete
> normally, but without any results (this is your current state), or do you
> want it to complete exceptionally?
>
> Best regards,
> Nico
>
>
>>
>>
>> Thanks!
>>
>>
>>
>>
>>
>


Re: asyhcnrouonous io question

2021-10-05 Thread Nicolaus Weidner
Hi Tom,

On Mon, Oct 4, 2021 at 10:42 PM tom yang  wrote:

> Hello,
>
>
>
> I have a recently ran into an issue with RichAsyncFunction and wanted to
> get some guidance from the community
>
>
>
> Please see snippet
>
>
>
> *class* AsyncFetchFromHttp *extends* RichAsyncFunction String, String>> {
>
> 2
>
> 3*private* *transient* AysncHttpClient client;
>
> 4
>
> 5@Override
>
> 6*public* *void* *open*(Configuration parameters) *throws* Exception {
>
> 7client = *new* AysncHttpClient();
>
> 8}
>
> 9
>
> 10@Override
>
> 11*public* *void* close() *throws* Exception {
>
> 12client.close();
>
> 13}
>
> 14
>
> 15@Override
>
> 16*public* *void* asyncInvoke(String key, *final* ResultFuture String, String>> resultFuture) *throws* Exception {
>
> 17
>
> 18*// issue the asynchronous request, receive a future for result*
>
> 19CompleteableFuture> future = httpClient
> .sendAsync(request, HttpResponse.BodyHandlers.ofString())
>
> 20
>
> 21future.whenCompleteAsync((response, throwable) -> {
>
> 22  *if* (throwable != *null* ) {
>
> 23
>
> 24  resultFuture.completeExceptionally(throwable);
>
> 25  }
>
> 26  *else* {
>
> 27*if* (resp.statusCode() == HttpStatus.SC_OK) {
>
> 28  resultFuture.complete(Collections.singleton(*new* Tuple2
> <>(key, response.body())
>
> 29}
>
> 30*else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {
>
> 32  resultFuture.complete(Collections.emptyList())
>
> 33}
>
> 34*else* {
>
> 35   resultFuture.completeExceptionally(*new* RuntimeException
> ("Server processing error"));
>
> 36}
>
> 37  }
>
> 38
>
> 39})
>
> 40
>
> 41
>
> 42}
>
> 43}
>
>
>
> 1 . If the future completes exceptionally, ie resultFuture
> .completeExceptionally(throwable);
>
> does the input message get discarded?
>

Which input do you mean here, "request"? It is not defined in your snippet,
did it get lost when trimming unimportant parts?
By default, you will get only the contained throwable, you would have to
enrich it with the input if you want to retain it.

2. Should the request be made on a dedicated ExecutorService or is the
> forkpoolcommon sufficient?
>

I don't see a good reason in general here to use a separate thread pool for
the requests. They are async (not blocking), are part of your Flink job and
run on your Taskmanagers. Unless there is something special in your setup
that makes you suspect they block other tasks...


> 3. If the rest api service for example returns 404, should you complete
> with an empty collection or can you omit line 32 entirely?
>

This depends on your desired behavior: Do you want it to complete normally,
but without any results (this is your current state), or do you want it to
complete exceptionally?

Best regards,
Nico


>
>
> Thanks!
>
>
>
>
>


asyhcnrouonous io question

2021-10-04 Thread tom yang
Hello, I have a recently ran into an issue with RichAsyncFunction and wanted to get some guidance from the community Please see snippet class AsyncFetchFromHttp extends RichAsyncFunction> {23    private transient AysncHttpClient client;45    @Override6    public void open(Configuration parameters) throws Exception {7    client = new AysncHttpClient();8    }910    @Override11    public void close() throws Exception {12    client.close();13    }1415    @Override16    public void asyncInvoke(String key, final ResultFuture> resultFuture) throws Exception {1718    // issue the asynchronous request, receive a future for result19    CompleteableFuture> future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())20    21    future.whenCompleteAsync((response, throwable) -> {22  if (throwable != null ) {2324  resultFuture.completeExceptionally(throwable);25  }26  else {27    if (resp.statusCode() == HttpStatus.SC_OK) {28  resultFuture.complete(Collections.singleton(new Tuple2<>(key, response.body())29    }  30    else if (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {32  resultFuture.complete(Collections.emptyList())33    }34    else {35   resultFuture.completeExceptionally(new RuntimeException("Server processing error"));36    }37  }38    39    })404142    }43} 1 . If the future completes exceptionally, ie resultFuture.completeExceptionally(throwable);does the input message get discarded?2. Should the request be made on a dedicated ExecutorService or is the forkpoolcommon sufficient?3. If the rest api service for example returns 404, should you complete with an empty collection or can you omit line 32 entirely? Thanks!