Yes, there is a loop to recursively search for files in directory but that
should be ok. The code fails when adding a new InputSplit to an ArrayList.
This is a standard operation.

Oh, I think I found a bug in `addNestedFiles`. It does not pick up the
length of the recursively found files in line 546. That can result in a
returned size of 0 which causes infinite InputSplits to be created and
added to the aforementioned ArrayList. Can you change

addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

to

length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles);

?



On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I have 10 files..I debugged the code and it seems that there's a loop in
> the FileInputFormat when files are nested far away from the root directory
> of the scan
>
> On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> how many files are in the directory?
>> You can count with "find /tmp/myDir | wc -l"
>>
>> Flink running out of memory while creating input splits indicates to me
>> that there are a lot of files in there.
>>
>> On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <pomperma...@okkam.it
>> > wrote:
>>
>>> Hi to all,
>>>
>>> I'm trying to recursively read a directory but it seems that the
>>>  totalLength value in the FileInputformat.createInputSplits() is not
>>> computed correctly..
>>>
>>> I have a files organized as:
>>>
>>> /tmp/myDir/A/B/cunk-1.txt
>>> /tmp/myDir/A/B/cunk-2.txt
>>>  ..
>>>
>>> If I try to do the following:
>>>
>>> Configuration parameters = new Configuration();
>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>
>>> env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print();
>>>
>>> I get:
>>>
>>> Caused by: org.apache.flink.runtime.JobException: Creating the input
>>> splits caused an error: Java heap space
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471)
>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515)
>>> ... 19 more
>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>> at java.util.ArrayList.add(ArrayList.java:440)
>>> at
>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503)
>>> at
>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51)
>>> at
>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
>>>
>>> Am I doing something wrong or is it a bug?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

Reply via email to