Hi! Looks to me that this is the following problem: The Decompression Streams did not properly forward the "close()" calls.
It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3. The fix is in that pull request: https://github.com/apache/flink/pull/2581 I have pushed the fix into the latest 1.1-SNAPSHOT branch. If you get the code via "git clone -b release-1.1 https://github.com/apache/flink.git" you will get the code that is the same as the 1.1.3 release, plus the patch to this problem. Greetings, Stephan On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI < y.marzou...@mindlytix.com> wrote: > Hi all, > > I'm reading a large number of small files from HDFS in batch mode (about > 20 directories, each directory contains about 3000 files, using > recursive.file.enumeration=true), and each time, at about 200 GB of > received data, my job fails with the following exception: > > java.io.IOException: Error opening the Input Split > hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block: > BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 > file=/filepath/filename.csv.gz > at org.apache.flink.api.common.io.FileInputFormat.open( > FileInputFormat.java:693) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:424) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:47) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:140) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Unknown Source) > Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain > block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313 > file=/filepath/filename.csv.gz > at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode( > DFSInputStream.java:984) > at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo( > DFSInputStream.java:642) > at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy( > DFSInputStream.java:882) > at org.apache.hadoop.hdfs.DFSInputStream.read( > DFSInputStream.java:934) > at org.apache.hadoop.hdfs.DFSInputStream.read( > DFSInputStream.java:735) > at java.io.FilterInputStream.read(Unknown Source) > at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream. > read(HadoopDataInputStream.java:59) > at java.util.zip.CheckedInputStream.read(Unknown Source) > at java.util.zip.GZIPInputStream.readUByte(Unknown Source) > at java.util.zip.GZIPInputStream.readUShort(Unknown Source) > at java.util.zip.GZIPInputStream.readHeader(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:44) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:31) > at org.apache.flink.api.common.io.FileInputFormat. > decorateInputStream(FileInputFormat.java:717) > at org.apache.flink.api.common.io.FileInputFormat.open( > FileInputFormat.java:689) > ... 5 more > > I checked the file each time and it exists and is healthy. Looking at the > taskmanager logs, I found the following exceptions which suggests it is > running out of connections: > > 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.BlockReaderFactory > - I/O error constructing remote block reader. > java.net.SocketException: No buffer space available (maximum connections > reached?): connect > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Unknown Source) > at sun.nio.ch.Net.connect(Unknown Source) > at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) > at org.apache.hadoop.net.SocketIOWithTimeout.connect( > SocketIOWithTimeout.java:192) > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) > at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) > at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer( > BlockReaderFactory.java:777) > at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp( > BlockReaderFactory.java:694) > at org.apache.hadoop.hdfs.BlockReaderFactory.build( > BlockReaderFactory.java:355) > at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo( > DFSInputStream.java:673) > at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy( > DFSInputStream.java:882) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) > at java.io.FilterInputStream.read(Unknown Source) > at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream. > read(HadoopDataInputStream.java:59) > at java.util.zip.CheckedInputStream.read(Unknown Source) > at java.util.zip.GZIPInputStream.readUByte(Unknown Source) > at java.util.zip.GZIPInputStream.readUShort(Unknown Source) > at java.util.zip.GZIPInputStream.readHeader(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:44) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:31) > at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream( > FileInputFormat.java:717) > at org.apache.flink.api.common.io.FileInputFormat.open( > FileInputFormat.java:689) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:424) > at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:47) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:140) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Unknown Source) > 2016-10-15 18:20:27,034 WARN org.apache.hadoop.hdfs.DFSClient > - Failed to connect to /x.x.x.x:50010 for block, add to > deadNodes and continue. java.net.SocketException: No buffer space available > (maximum connections reached?): connect > java.net.SocketException: No buffer space available (maximum connections > reached?): connect > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Unknown Source) > at sun.nio.ch.Net.connect(Unknown Source) > at sun.nio.ch.SocketChannelImpl.connect(Unknown Source) > at org.apache.hadoop.net.SocketIOWithTimeout.connect( > SocketIOWithTimeout.java:192) > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531) > at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436) > at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer( > BlockReaderFactory.java:777) > at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp( > BlockReaderFactory.java:694) > at org.apache.hadoop.hdfs.BlockReaderFactory.build( > BlockReaderFactory.java:355) > at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo( > DFSInputStream.java:673) > at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy( > DFSInputStream.java:882) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735) > at java.io.FilterInputStream.read(Unknown Source) > at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream. > read(HadoopDataInputStream.java:59) > at java.util.zip.CheckedInputStream.read(Unknown Source) > at java.util.zip.GZIPInputStream.readUByte(Unknown Source) > at java.util.zip.GZIPInputStream.readUShort(Unknown Source) > at java.util.zip.GZIPInputStream.readHeader(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at java.util.zip.GZIPInputStream.<init>(Unknown Source) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:44) > at org.apache.flink.api.common.io.compression. > GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory > .java:31) > at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream( > FileInputFormat.java:717) > at org.apache.flink.api.common.io.FileInputFormat.open( > FileInputFormat.java:689) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:424) > at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99) > at org.apache.flink.api.common.io.DelimitedInputFormat.open( > DelimitedInputFormat.java:47) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:140) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Unknown Source) > > I inspected the open connections, and found that a very large number of > connections are opened by the job and stuck on the CLOSE_WAIT status, which > I guess exhausted the ephemeral port space after some time. > I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and > using a prallelism of 8. I got the same exception even with a job > paralellism set to 1. The same exception happened after upgrading to Flink > 1.1.3 too. > > Any idea what could be the root cause of the problem and how to solve it? > Thank you. > > Best, > Yassine >