Hi all,

I'm reading a large number of small files from HDFS in batch mode (about 20
directories, each directory contains about 3000 files, using
recursive.file.enumeration=true), and each time, at about 200 GB of
received data, my job fails with the following exception:

java.io.IOException: Error opening the Input Split
hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
        at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:693)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
        at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
        at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
file=/filepath/filename.csv.gz
        at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:984)
        at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642)
        at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
        at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
        at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
        at java.io.FilterInputStream.read(Unknown Source)
        at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
        at java.util.zip.CheckedInputStream.read(Unknown Source)
        at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
        at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
        at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
        at java.util.zip.GZIPInputStream.<init>(Unknown Source)
        at java.util.zip.GZIPInputStream.<init>(Unknown Source)
        at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
        at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
        at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
        at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
        ... 5 more

I checked the file each time and it exists and is healthy. Looking at the
taskmanager logs, I found the following exceptions which suggests it is
running out of connections:

2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
                - I/O error constructing remote block reader.
java.net.SocketException: No buffer space available (maximum connections
reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Unknown Source)
2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
                 - Failed to connect to /x.x.x.x:50010 for block, add to
deadNodes and continue. java.net.SocketException: No buffer space available
(maximum connections reached?): connect
java.net.SocketException: No buffer space available (maximum connections
reached?): connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.Net.connect(Unknown Source)
at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
at
org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:777)
at
org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:694)
at
org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:673)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
at java.io.FilterInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:59)
at java.util.zip.CheckedInputStream.read(Unknown Source)
at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at java.util.zip.GZIPInputStream.<init>(Unknown Source)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:44)
at
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory.java:31)
at
org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(FileInputFormat.java:717)
at
org.apache.flink.api.common.io.FileInputFormat.open(FileInputFormat.java:689)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:424)
at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:47)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:140)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Unknown Source)

I inspected the open connections, and found that a very large number of
connections are opened by the job and stuck on the CLOSE_WAIT status, which
I guess exhausted the ephemeral port space after some time.
I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
using a prallelism of 8. I got the same exception even with a job
paralellism set to 1. The same exception happened after upgrading to Flink
1.1.3 too.

Any idea what could be the root cause of the problem and how to solve it?
Thank you.

Best,
Yassine

Reply via email to