Danny Cranmer created FLINK-31041: ------------------------------------- Summary: Race condition in DefaultScheduler results in memory leak and busy loop Key: FLINK-31041 URL: https://issues.apache.org/jira/browse/FLINK-31041 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.16.1, 1.15.3 Reporter: Danny Cranmer Fix For: 1.17.0, 1.15.4, 1.16.2
h4. Context When a job creates multiple sources that use the {{SourceCoordinator}} (FLIP-27), there is a failure race condition that results in: * Memory leak of \{{ExecutionVertexVersion}} * Busy loop constantly trying to restart job * Restart strategy is not respected This results in the Job Manager becoming unresponsive. h4. Reproduction Steps This can be reproduced by a job that creates multiple sources that fail in the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}} trying to load a non-existent cert from the file system and throwing FNFE. Thus, here is a simple job to reproduce (BE WARNED: running this locally will lock up your IDE): {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); KafkaSource<String> source = KafkaSource.<String>builder() .setProperty("security.protocol", "SASL_SSL") // SSL configurations // Configure the path of truststore (CA) provided by the server .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks") .setProperty("ssl.truststore.password", "test1234") // Configure the path of keystore (private key) if client authentication is required .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks") .setProperty("ssl.keystore.password", "test1234") // SASL configurations // Set SASL mechanism as SCRAM-SHA-256 .setProperty("sasl.mechanism", "SCRAM-SHA-256") // Set JAAS configurations .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";") .setBootstrapServers("http://localhost:3456") .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32) .mapToObj(i -> env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source " + i).uid("source-" + i) .keyBy(s -> s.charAt(0)) .map(s -> s)) .collect(Collectors.toList()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").uid("source") .keyBy(s -> s.charAt(0)) .union(sources.toArray(new SingleOutputStreamOperator[] {})) .print(); env.execute("test job"); {code} h4. Root Cause We can see that the {{OperatorCoordinatorHolder}} already has a [debounce mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609], however the default scheduler does not, and processes many {{OperatorCoordinatorHolder}}. h4. Fix I have managed to fix this, I will open a PR, but would need feedback from people who understand this code better than me! -- This message was sent by Atlassian Jira (v8.20.10#820010)