Hi Min, I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator. >From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1. Running out of connections would mean that there are 5 in-flight checkpoints that were not completed, which seems a lot to me (given that the sink is probably at the end of the program).
If I remember correctly, Piotr (in CC) was working on the exactly-once feature of the Kafka producer. Maybe he can help. Best, Fabian Am Mo., 8. Apr. 2019 um 14:43 Uhr schrieb <min....@ubs.com>: > Hi, > > > > I keep getting exceptions > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Too many > ongoing snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints." > > > > I understand that DEFAULT_KAFKA_PRODUCERS_POOL_SIZE is 5 and need to > increase this size. What considerations should I take to increase this > size? what is a size for a normal setting e.g. 32? > > > > I have a check point setting like this and run a parallelism of 16 and > have a check point setting like this > > > > public static void setup(StreamExecutionEnvironment env) { > > env.enableCheckpointing(2_000); > > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1_000); > > env.getCheckpointConfig().setCheckpointTimeout(60_000); > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > env.setStateBackend(new MemoryStateBackend(Integer.MAX_VALUE/64)); > > > //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > } > > > > Regards, > > > > Min >