That solved my problem, Thank you!

Best,
Yassine

2016-10-16 19:18 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> Looks to me that this is the following problem: The Decompression Streams
> did not properly forward the "close()" calls.
>
> It is in the lastest 1.2-SNAPSHOT, but did not make it into version 1.1.3.
> The fix is in that pull request: https://github.com/apache/flink/pull/2581
>
> I have pushed the fix into the latest 1.1-SNAPSHOT branch.
>
> If you get the code via "git clone -b release-1.1
> https://github.com/apache/flink.git"; you will get the code that is the
> same as the 1.1.3 release, plus the patch to this problem.
>
> Greetings,
> Stephan
>
>
> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi all,
>>
>> I'm reading a large number of small files from HDFS in batch mode (about
>> 20 directories, each directory contains about 3000 files, using
>> recursive.file.enumeration=true), and each time, at about 200 GB of
>> received data, my job fails with the following exception:
>>
>> java.io.IOException: Error opening the Input Split
>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:693)
>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>>         at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not
>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>> file=/filepath/filename.csv.gz
>>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu
>> tStream.java:984)
>>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:642)
>>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:934)
>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.
>> java:735)
>>         at java.io.FilterInputStream.read(Unknown Source)
>>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>         at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>>         ... 5 more
>>
>> I checked the file each time and it exists and is healthy. Looking at the
>> taskmanager logs, I found the following exceptions which suggests it is
>> running out of connections:
>>
>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>>                     - I/O error constructing remote block reader.
>> java.net.SocketException: No buffer space available (maximum connections
>> reached?): connect
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>> thTimeout.java:192)
>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>> eaderFactory.java:777)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>> erFromTcp(BlockReaderFactory.java:694)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>> actory.java:355)
>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:673)
>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>> at java.io.FilterInputStream.read(Unknown Source)
>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Unknown Source)
>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>>                    - Failed to connect to /x.x.x.x:50010 for block, add to
>> deadNodes and continue. java.net.SocketException: No buffer space available
>> (maximum connections reached?): connect
>> java.net.SocketException: No buffer space available (maximum connections
>> reached?): connect
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.Net.connect(Unknown Source)
>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>> thTimeout.java:192)
>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3436)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>> eaderFactory.java:777)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>> erFromTcp(BlockReaderFactory.java:694)
>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>> actory.java:355)
>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>> ream.java:673)
>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>> putStream.java:882)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>> at java.io.FilterInputStream.read(Unknown Source)
>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>> HadoopDataInputStream.java:59)
>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>> Stream(FileInputFormat.java:717)
>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>> tFormat.java:689)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:424)
>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputFormat.java:99)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>> imitedInputFormat.java:47)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:140)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Unknown Source)
>>
>> I inspected the open connections, and found that a very large number of
>> connections are opened by the job and stuck on the CLOSE_WAIT status, which
>> I guess exhausted the ephemeral port space after some time.
>> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
>> using a prallelism of 8. I got the same exception even with a job
>> paralellism set to 1. The same exception happened after upgrading to Flink
>> 1.1.3 too.
>>
>> Any idea what could be the root cause of the problem and how to solve it?
>> Thank you.
>>
>> Best,
>> Yassine
>>
>
>

Reply via email to