Hi,

通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。

在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。

Best,
Weihua


On Wed, Mar 8, 2023 at 4:19 PM aiden <18765295...@163.com> wrote:

> Hi
>   我在使用Async Hbase时频繁遇到too many open file异常,程序自动重启后会立即报错,具体报错日志如下:
> 2023-03-08 16:15:39
> org.jboss.netty.channel.ChannelException: Failed to create a selector.
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
> at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
> at
> org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
> at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
> at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
> at
> com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Too many open files
> at sun.nio.ch.IOUtil.makePipe(Native Method)
> at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
> at sun.nio.ch
> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> at java.nio.channels.Selector.open(Selector.java:227)
> at
> org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
> ... 25 more
>   对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下
> java.io.IOException: Could not perform checkpoint 5 for operator async
> wait operator (2/9)#0.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> not complete snapshot 5 for operator async wait operator (2/9)#0. Failure
> reason: Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
> ... 22 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
> at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> at
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
> ... 33 more
>
> 对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放
> COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF       NODE
> NAME
> java    168258 yarn *648u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *652r     FIFO                0,9       0t0  850723636
> pipe
> java    168258 yarn *653w     FIFO                0,9       0t0  850723636
> pipe
> java    168258 yarn *654u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *655r     FIFO                0,9       0t0  850723637
> pipe
> java    168258 yarn *656w     FIFO                0,9       0t0  850723637
> pipe
> java    168258 yarn *657u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *658r     FIFO                0,9       0t0  850723638
> pipe
> java    168258 yarn *659w     FIFO                0,9       0t0  850723638
> pipe
> java    168258 yarn *660u  a_inode               0,10         0       8670
> [eventpoll]
> java    168258 yarn *661w     FIFO                0,9       0t0  850723639
> pipe
>
> flink版本:1.16.0
> asynchbase版本:1.8.2
> 附POM:
>         <dependency>
>             <groupId>org.hbase</groupId>
>             <artifactId>asynchbase</artifactId>
>             <version>1.8.2</version>
>             <exclusions>
>                 <exclusion>
>                     <groupId>org.slf4j</groupId>
>                     <artifactId>*</artifactId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>
> 关键代码如下:
>         SingleOutputStreamOperator asyncFunc = AsyncDataStream
>                 .orderedWaitWithRetry(source, new
> HbaseDimensionAsyncFunc(), 60, TimeUnit.SECONDS, 300, AsyFixedRetry())
>                 .setParallelism(9)
>                 .uid("asyncFunc");
>
> public class HbaseDimensionAsyncFunc extends
> RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>,
> ArrayList<HashMap<String, String>>> {
>     HBaseClient client = null;
>
>     @Override
>     public void open(Configuration configuration) throws Exception {
>         super.open(configuration);
>         log.warn("==========创建hbase客户端==========");
>         client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
>     }
>
>     @Override
>     public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String,
> String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture)
> throws Exception {
>
>             client.get(new GetRequest(HBASE_TABLE,
> uuid)).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
>             // 业务代码
>             });
>     }
> }
>
>

回复