Hi!

Looks to me that this is the following problem: The Decompression Streams
did not properly forward the "close()" calls.

It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3.
The fix is in that pull request: https://github.com/apache/flink/pull/2581

I have pushed the fix into the latest 1.1-SNAPSHOT branch.

If you get the code via "git clone -b release-1.1
https://github.com/apache/flink.git"; you will get the code that is the same
as the 1.1.3 release, plus the patch to this problem.

Greetings,
Stephan


On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi all,
>
> I'm reading a large number of small files from HDFS in batch mode (about
> 20 directories, each directory contains about 3000 files, using
> recursive.file.enumeration=true), and each time, at about 200 GB of
> received data, my job fails with the following exception:
>
> java.io.IOException: Error opening the Input Split
> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
> file=/filepath/filename.csv.gz
>         at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:693)
>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
>         at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
> block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
> file=/filepath/filename.csv.gz
>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(
> DFSInputStream.java:984)
>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:642)
>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(
> DFSInputStream.java:934)
>         at org.apache.hadoop.hdfs.DFSInputStream.read(
> DFSInputStream.java:735)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>         at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
>         at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
>         at org.apache.flink.api.common.io.FileInputFormat.
> decorateInputStream(FileInputFormat.java:717)
>         at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
>         ... 5 more
>
> I checked the file each time and it exists and is healthy. Looking at the
> taskmanager logs, I found the following exceptions which suggests it is
> running out of connections:
>
> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>                   - I/O error constructing remote block reader.
> java.net.SocketException: No buffer space available (maximum connections
> reached?): connect
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at org.apache.hadoop.net.SocketIOWithTimeout.connect(
> SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(
> BlockReaderFactory.java:777)
> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(
> BlockReaderFactory.java:694)
> at org.apache.hadoop.hdfs.BlockReaderFactory.build(
> BlockReaderFactory.java:355)
> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:673)
> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
> at java.io.FilterInputStream.read(Unknown Source)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
> at java.util.zip.CheckedInputStream.read(Unknown Source)
> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
> at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(
> FileInputFormat.java:717)
> at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Unknown Source)
> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>                    - Failed to connect to /x.x.x.x:50010 for block, add to
> deadNodes and continue. java.net.SocketException: No buffer space available
> (maximum connections reached?): connect
> java.net.SocketException: No buffer space available (maximum connections
> reached?): connect
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.Net.connect(Unknown Source)
> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
> at org.apache.hadoop.net.SocketIOWithTimeout.connect(
> SocketIOWithTimeout.java:192)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(
> BlockReaderFactory.java:777)
> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(
> BlockReaderFactory.java:694)
> at org.apache.hadoop.hdfs.BlockReaderFactory.build(
> BlockReaderFactory.java:355)
> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(
> DFSInputStream.java:673)
> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(
> DFSInputStream.java:882)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
> at java.io.FilterInputStream.read(Unknown Source)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.
> read(HadoopDataInputStream.java:59)
> at java.util.zip.CheckedInputStream.read(Unknown Source)
> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:44)
> at org.apache.flink.api.common.io.compression.
> GzipInflaterInputStreamFactory.create(GzipInflaterInputStreamFactory
> .java:31)
> at org.apache.flink.api.common.io.FileInputFormat.decorateInputStream(
> FileInputFormat.java:717)
> at org.apache.flink.api.common.io.FileInputFormat.open(
> FileInputFormat.java:689)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:424)
> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
> at org.apache.flink.api.common.io.DelimitedInputFormat.open(
> DelimitedInputFormat.java:47)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:140)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Unknown Source)
>
> I inspected the open connections, and found that a very large number of
> connections are opened by the job and stuck on the CLOSE_WAIT status, which
> I guess exhausted the ephemeral port space after some time.
> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
> using a prallelism of 8. I got the same exception even with a job
> paralellism set to 1. The same exception happened after upgrading to Flink
> 1.1.3 too.
>
> Any idea what could be the root cause of the problem and how to solve it?
> Thank you.
>
> Best,
> Yassine
>

Reply via email to