I'd suggest to check that shutdown() in close() always completes:
@Override
public void close() {
    this.redisson.shutdown();
    log.info(String.format("Shut down redisson instance in close
method, RedissonRxClient shutdown is %s", redisson.isShutdown()));
}
maybe by logging on open and then comparing the counts.

It can be that the shutdown is interrupted, times out, or not called
(as a result of a bug).
Probably, it also makes sense to add isShuttingDown() to see whether
shutdown has completed.

Regards,
Roman

On Thu, Sep 23, 2021 at 4:29 AM a773807943 <a773807...@gmail.com> wrote:
>
>
>
> I encountered a problem in the process of integrating Flink and Redisson. 
> When the task encounters abnormalities and keeps retries, it will cause the 
> number of Redis Clients to increase volatility (sometimes the number 
> increases, sometimes the number decreases, but the overall trend is growth). 
> Even if I shutdown the Redisson Instance by overwriting the close function , 
> the number of Redis-Clients cannot be prevented from continuing to grow, and 
> eventually the number of Clients will reach the upper limit and an error will 
> be thrown. Moreover, this situation only occurs in the Flink cluster 
> operation mode, and the number of Redis-Clients will remain stable in the 
> local mode. The test code is below. I wonder if you can provide specific 
> reasons and solutions for this situation, thank you.
>
>
> flink version:1.13.2
> redisson version:3.16.1
>
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
>
>
> import java.util.Properties;
> import java.util.Random;
>
>
> public class ExceptionTest {
>     public static void main(String[] args) throws Exception{
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
>         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>         env.enableCheckpointing(1000 * 60);
>         DataStream<String> mock = createDataStream(env);
>         mock.keyBy(x - > 1)
>                 .process(new ExceptionTestFunction())
>                 .uid("batch-query-key-process")
>                 .filter(x- >x!=null)
>                 .print();
>         env.execute("Exception-Test");
>     }
>
>
>     private static DataStream<String > 
> createDataStream(StreamExecutionEnvironment env) {
>         String topic = "test_topic_xhb03";
>         Properties test = new Properties();
>         test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
>         test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
>
>
>         FlinkKafkaConsumer<String > consumer = new FlinkKafkaConsumer<String 
> >(topic, new SimpleStringSchema(), test);
>         consumer.setStartFromLatest();
>
>
>         DataStream<String > source = env.addSource(consumer);
>         return source;
>     }
> }
>
>
>
>
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
>
>
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction<Integer, 
> String, String > {
>     private RedissonRxClient redisson;
>
>
>     @Override
>     public void close() {
>         this.redisson.shutdown();
>         log.info(String.format("Shut down redisson instance in close method, 
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
>
>
>     }
>
>     @Override
>     public void open(Configuration parameters) {
>         String prefix = "redis://";
>         Config config = new Config();
>                         config.useSingleServer()
>                                 .setClientName("xhb-redisson-main")
>                                 .setTimeout(5000)
>                                 .setConnectTimeout(10000)
>                                 .setConnectionPoolSize(4)
>                                 .setConnectionMinimumIdleSize(2)
>                                 .setIdleConnectionTimeout(10000)
>                                 .setAddress("127.0.0.1:6379")
>                                 .setDatabase(0)
>                                 .setPassword(null);
>         this.redisson = Redisson.create(config).rxJava();
>     }
>
>
>     @Override
>     public void processElement(String value, Context ctx, Collector<String > 
> out) throws Exception {
>         throw new NullPointerException("Null Pointer in ProcessElement");
>     }
> }

Reply via email to