We are having trouble scaling up Flink to execute a collection of SQL
queries on a yarn cluster. Has anyone run this kind of workload on a
cluster? Any tips on how to get past this issue?

With a high number of Flink SQL queries (100 instances of the query at the
bottom of this message), the Flink command line client fails with a
“JobManager did not respond within 600000 ms” on a Yarn cluster. JobManager
logs has nothing after the last TaskManager started indicating its hung
(creating the ExecutionGraph?). This configuration of queries works as a
standalone program locally. I can also successfully launch and process 2
instances of the query in cluster mode.

When attempting 10 query instances in cluster mode, we are able to submit
but the job errors out with “Insufficient number of network buffers:
required 725, but only 135 available. The total number of network buffers
is currently set to 61035 of 32768 bytes each. ”. Though surprisingly with
a query count of 1, 15000 is all the network buffers that are needed. So it
seems like the network buffer count quickly scales with the number of

Note: Each Row in structStream contains 515 columns (very sparse table,
>500 are null for each row) including a column that has the raw message.

In the YARN cluster we specify 18GB for TaskManager, 18GB for the
JobManager, 5 slots and parallelism of 725 (the number of partitions in our
Kafka source).

The query is a simple filter and aggregation:

select count(*), 'idnumber' as criteria, Environment,
CollectedTimestamp, EventTimestamp, RawMsg, Source \n" +
        "from structStream \n" +
        "where Environment='MyEnvironment' and Rule='MyRule' and
LogType='MyLogType' and Outcome='Success'\n" +
        "group by tumble(proctime, INTERVAL '1' SECOND), Environment,
CollectedTimestamp, EventTimestamp, RawMsg, Source"

The code is included in https://issues.apache.org/jira/browse/FLINK-9166




Reply via email to