Re: Resource leak would happen if exception thrown when flink redisson

2021-09-23 Thread Roman Khachatryan
I'd suggest to check that shutdown() in close() always completes:
@Override
public void close() {
this.redisson.shutdown();
log.info(String.format("Shut down redisson instance in close
method, RedissonRxClient shutdown is %s", redisson.isShutdown()));
}
maybe by logging on open and then comparing the counts.

It can be that the shutdown is interrupted, times out, or not called
(as a result of a bug).
Probably, it also makes sense to add isShuttingDown() to see whether
shutdown has completed.

Regards,
Roman

On Thu, Sep 23, 2021 at 4:29 AM a773807943  wrote:
>
>
>
> I encountered a problem in the process of integrating Flink and Redisson. 
> When the task encounters abnormalities and keeps retries, it will cause the 
> number of Redis Clients to increase volatility (sometimes the number 
> increases, sometimes the number decreases, but the overall trend is growth). 
> Even if I shutdown the Redisson Instance by overwriting the close function , 
> the number of Redis-Clients cannot be prevented from continuing to grow, and 
> eventually the number of Clients will reach the upper limit and an error will 
> be thrown. Moreover, this situation only occurs in the Flink cluster 
> operation mode, and the number of Redis-Clients will remain stable in the 
> local mode. The test code is below. I wonder if you can provide specific 
> reasons and solutions for this situation, thank you.
>
>
> flink version:1.13.2
> redisson version:3.16.1
>
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
>
>
> import java.util.Properties;
> import java.util.Random;
>
>
> public class ExceptionTest {
> public static void main(String[] args) throws Exception{
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.enableCheckpointing(1000 * 60);
> DataStream mock = createDataStream(env);
> mock.keyBy(x - > 1)
> .process(new ExceptionTestFunction())
> .uid("batch-query-key-process")
> .filter(x- >x!=null)
> .print();
> env.execute("Exception-Test");
> }
>
>
> private static DataStream 
> createDataStream(StreamExecutionEnvironment env) {
> String topic = "test_topic_xhb03";
> Properties test = new Properties();
> test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
> test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");
>
>
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer >(topic, new SimpleStringSchema(), test);
> consumer.setStartFromLatest();
>
>
> DataStream source = env.addSource(consumer);
> return source;
> }
> }
>
>
>
>
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.redisson.Redisson;
> import org.redisson.api.RedissonRxClient;
> import org.redisson.config.Config;
>
>
> @Slf4j
> public class ExceptionTestFunction extends KeyedProcessFunction String, String > {
> private RedissonRxClient redisson;
>
>
> @Override
> public void close() {
> this.redisson.shutdown();
> log.info(String.format("Shut down redisson instance in close method, 
> RedissonRxClient shutdown is %s", redisson.isShutdown()));
>
>
> }
>
> @Override
> public void open(Configuration parameters) {
> String prefix = "redis://";
> Config config = new Config();
> config.useSingleServer()
> .setClientName("xhb-redisson-main")
> .setTimeout(5000)
> .setConnectTimeout(1)
> .setConnectionPoolSize(4)
> .setConnectionMinimumIdleSize(2)
> .setIdleConnectionTimeout(1)
> .setAddress("127.0.0.1:6379")
> .setDatabase(0)
> .setPassword(null);
> this.redisson = Redisson.create(config).rxJava();
> }
>
>
> @Override
> public void processElement(String value, Context ctx, Collector 
> out) throws Exception {
> throw new NullPointerException("Null Pointer in ProcessElement");
> }
> }


Resource leak would happen if exception thrown when flink redisson

2021-09-22 Thread a773807943






I encountered a problem in the process of integrating Flink and Redisson. When the task encounters abnormalities and keeps retries, it will cause the number of Redis Clients to increase volatility (sometimes the number increases, sometimes the number decreases, but the overall trend is growth). Even if I shutdown the Redisson Instance by overwriting the close function , the number of Redis-Clients cannot be prevented from continuing to grow, and eventually the number of Clients will reach the upper limit and an error will be thrown. Moreover, this situation only occurs in the Flink cluster operation mode, and the number of Redis-Clients will remain stable in the local mode. The test code is below. I wonder if you can provide specific reasons and solutions for this situation, thank you.flink version:1.13.2redisson version:3.16.1import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;import java.util.Random;public class ExceptionTest {    public static void main(String[] args) throws Exception{        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);        env.enableCheckpointing(1000 * 60);        DataStream mock = createDataStream(env);        mock.keyBy(x - > 1)                .process(new ExceptionTestFunction())                .uid("batch-query-key-process")                .filter(x- >x!=null)                .print();        env.execute("Exception-Test");    }    private static DataStream createDataStream(StreamExecutionEnvironment env) {        String topic = "test_topic_xhb03";        Properties test = new Properties();        test.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");        test.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new SimpleStringSchema(), test);        consumer.setStartFromLatest();        DataStream source = env.addSource(consumer);        return source;    }}import lombok.extern.slf4j.Slf4j;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;import org.redisson.Redisson;import org.redisson.api.RedissonRxClient;import org.redisson.config.Config;@Slf4jpublic class ExceptionTestFunction extends KeyedProcessFunction {    private RedissonRxClient redisson;    @Override    public void close() {        this.redisson.shutdown();        log.info(String.format("Shut down redisson instance in close method, RedissonRxClient shutdown is %s", redisson.isShutdown()));    }        @Override    public void open(Configuration parameters) {        String prefix = "redis://";        Config config = new Config();                        config.useSingleServer()                                .setClientName("xhb-redisson-main")                                .setTimeout(5000)                                .setConnectTimeout(1)                                .setConnectionPoolSize(4)                                .setConnectionMinimumIdleSize(2)                                .setIdleConnectionTimeout(1)                                .setAddress("127.0.0.1:6379")                                .setDatabase(0)                                .setPassword(null);        this.redisson = Redisson.create(config).rxJava();    }    @Override    public void processElement(String value, Context ctx, Collector out) throws Exception {        throw new NullPointerException("Null Pointer in ProcessElement");    }}