Hi Arvid,

Thanks for a quick reply.

The second reference link (
http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2) from
your answer is not accessible though. Could you share some more numbers
from it? Are these benchmarks published somewhere?

Without actual IO call, Async IO operator benchmark of 1.6 K records/ms per
core translates to *1.6 million records/sec per core*. So an 8 core machine
should give roughly *12.8 million records/sec* ? Is this the correct
number? How do we compare it with this benchmark
<https://www.ververica.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
article that talks about total throughput of 4 million records/sec (without
Async IO operator) in a cluster of about 10 machines with 16-core each?

Ordered wait is indispensable for our use-case because we need to call the
external (partner organisation) system's API endpoint for each incoming
record. Depending on the response from that API we need to decide how to
process this record and order needs to be preserved. This may not have been
a problem if data ingestion rates were low. Real challenge is because of
the high-speed stream (millions of events per second) of input.

Is higher core machines an answer or is Flink not suitable for use-cases
like this?



On Thu, Jun 11, 2020 at 2:44 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Arti,
>
> microbenchmarks for AsyncIO are available [1] and the results shown in
> [2]. So you can roughly expect 1.6k records/ms per core to be the upper
> limit without any actual I/O. That range should hold for Flink 1.10 and
> coming Flink 1.11. I cannot say much about older versions and you didn't
> specify which you use. But it shouldn't be an order of magnitude different.
>
> The biggest performance improvement will probably be to switch to
> unordered - results are emitted as soon as they arrive. On ordered, the
> first element that came in, needs to be finished before any emission. If
> some elements take longer than others, these slow elements quickly become a
> bottleneck.
>
> If async I/O needs to be ordered, then you need to tweak what you already
> mentioned. Set DOP to the number of physical cores, there is no benefit in
> going higher. If you indeed use an async HTTP client, then the queue size
> should be a bit higher than the thread pool size. The thread pool size will
> effectively limit the parallelism per subtask and you want to saturate that
> from the Flink side. The thread pool size (together with maxConnections)
> will put the hard limit together with the request processing time on your
> application.
>
> I'd probably consider using more machines in your stead instead of more
> cores per machine (sounded like that is an option). So instead of using
> 10x12 cores, use 15x8 cores. You could measure how much max throughput to
> expect by using one machine and use a benchmarking tool that increases the
> requests per second on that machine until it hits the limit. Then you know
> how many machines you need at the very least.
>
> Finally, it might also be a good time to review your architecture.
> Microservices are not the best fit for a streaming application. For
> example, if this is a lookup service, it would scale and fit much better if
> all data could be ingested by Flink as an additional data source (e.g.
> Kafka topic). Existing microservices might be converted into such data
> sources with change-data-capture.
>
> [1]
> https://github.com/dataArtisans/flink-benchmarks/blob/master/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
> [2] http://codespeed.dak8s.net:8000/timeline/?ben=asyncWait.ORDERED&env=2
>
> On Wed, Jun 10, 2020 at 10:06 PM Arti Pande <pande.a...@gmail.com> wrote:
>
>> As Flink Async IO operator is designed for external API or DB calls, are
>> there any specific guidelines / tips for scaling up this operator?
>> Particularly for use-cases where incoming events are being ingested at a
>> very high-speed and the Async IO operator with orderedWait mode can not
>> keep up with that speed (although the target API endpoint it is calling is
>> load tested to provide much higher throughput with very minimal latency).
>> In our case adding Async IO operator to the pipeline *reduced the
>> throughput by 88% to 90%*. This is huge performance hit!
>>
>> We tried a couple of things:
>>
>>    1. Increasing the async buffer capacity parameter, there by
>>    increasing the number of concurrent requests at any given point in time
>>    that are waiting for response. This proved counter-productive beyond a 
>> very
>>    small number like 50 or 100.
>>    2. Increasing the operator parallelism. This does not help much as
>>    the number of cores on our machines are limited (8 or 12)
>>    3. Tuning the AsyncHTTPClient configuration (keepAlive=true,
>>    maxConnections, maxConnectionsPerHost) and the size of
>>    FixedThreadPool used by the Listener of its Future. Again without much
>>    improvement.
>>
>> Our observation is that although Async IO operator works for one stream
>> element at a time, the operator and its underlying HTTP client are
>> multithreaded and need higher core machines for high-speed stream
>> processing. If the only machines available for this kind of applications
>> are 8 to 16 cores, we face challenges in meeting the required throughput
>> and latency SLAs.
>>
>> Are there any micro-benchmarks or tuning guidelines for using Async IO for
>> high-speed stream processing, so we know how much throughput to expect from
>> it?
>> Thanks & regards,
>> Arti
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to