I'd suggest to check that shutdown() in close() always completes:
@Override
public void close() {
this.redisson.shutdown();
log.info(String.format("Shut down redisson instance in close
method, RedissonRxClient shutdown is %s", redisson.isShutdown()));
}
maybe by logging on open and then comparing the counts.
It can be that the shutdown is interrupted, times out, or not called
(as a result of a bug).
Probably, it also makes sense to add isShuttingDown() to see whether
shutdown has completed.
Regards,
Roman
On Thu, Sep 23, 2021 at 4:29 AM a773807943 wrote:
>
>
>
> I encountered a problem in the process of integrating Flink and Redisson.
> When the task encounters abnormalities and keeps retries, it will cause the
> number of Redis Clients to increase volatility (sometimes the number
> increases, sometimes the number decreases, but the overall trend is growth).
> Even if I shutdown the Redisson Instance by overwriting the close function ,
> the number of Redis-Clients cannot be prevented from continuing to grow, and
> eventually the number of Clients will reach the upper limit and an error will
> be thrown. Moreover, this situation only occurs in the Flink cluster
> operation mode, and the number of Redis-Clients will remain stable in the
> local mode. The test code is below. I wonder if you can provide specific
> reasons and solutions for this situation, thank you.
>
>
> flink version:1.13.2
> redisson version:3.16.1
>
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
>
>
> import java.util.Properties;
> import java.util.Random;
>
>
> public class ExceptionTest {
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.enableCheckpointing(1000 * 60);
> DataStream mock = createDataStream(env);
> mock.keyBy(x - > 1)
> .process(new ExceptionTestFunction())
> .uid("batch-query-key-process")
> .filter(x- >x!=null)
> .print();
> env.execute("Exception-Test");
> }
>
>
> private static DataStream
> createDataStream(StreamExecutionEnvironment env) {
> String topic = "test_topic_xhb03";
> Properties test = new Properties();
> test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
> test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
>
>
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer >(topic, new SimpleStringSchema(), test);
> consumer.setStartFromLatest();
>
>
> DataStream source = env.addSource(consumer);
> return source;
> }
> }
>
>
>
>
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
>
>
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction String, String > {
> private RedissonRxClient redisson;
>
>
> @Override
> public void close() {
> this.redisson.shutdown();
> log.info(String.format("Shut down redisson instance in close method,
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
>
>
> }
>
> @Override
> public void open(Configuration parameters) {
> String prefix = "redis://";
> Config config = new Config();
> config.useSingleServer()
> .setClientName("xhb-redisson-main")
> .setTimeout(5000)
> .setConnectTimeout(1)
> .setConnectionPoolSize(4)
> .setConnectionMinimumIdleSize(2)
> .setIdleConnectionTimeout(1)
> .setAddress("127.0.0.1:6379")
> .setDatabase(0)
> .setPassword(null);
> this.redisson = Redisson.create(config).rxJava();
> }
>
>
> @Override
> public void processElement(String value, Context ctx, Collector
> out) throws Exception {
> throw new NullPointerException("Null Pointer in ProcessElement");
> }
> }