Yes, there is a loop to recursively search for files in directory but that should be ok. The code fails when adding a new InputSplit to an ArrayList. This is a standard operation.
Oh, I think I found a bug in `addNestedFiles`. It does not pick up the length of the recursively found files in line 546. That can result in a returned size of 0 which causes infinite InputSplits to be created and added to the aforementioned ArrayList. Can you change addNestedFiles(dir.getPath(), files, length, logExcludedFiles); to length += addNestedFiles(dir.getPath(), files, length, logExcludedFiles); ? On Tue, May 26, 2015 at 2:21 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > I have 10 files..I debugged the code and it seems that there's a loop in > the FileInputFormat when files are nested far away from the root directory > of the scan > > On Tue, May 26, 2015 at 2:14 PM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Flavio, >> >> how many files are in the directory? >> You can count with "find /tmp/myDir | wc -l" >> >> Flink running out of memory while creating input splits indicates to me >> that there are a lot of files in there. >> >> On Tue, May 26, 2015 at 2:10 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Hi to all, >>> >>> I'm trying to recursively read a directory but it seems that the >>> totalLength value in the FileInputformat.createInputSplits() is not >>> computed correctly.. >>> >>> I have a files organized as: >>> >>> /tmp/myDir/A/B/cunk-1.txt >>> /tmp/myDir/A/B/cunk-2.txt >>> .. >>> >>> If I try to do the following: >>> >>> Configuration parameters = new Configuration(); >>> parameters.setBoolean("recursive.file.enumeration", true); >>> >>> env.readTextFile("file:////tmp/myDir)).withParameters(parameters).print(); >>> >>> I get: >>> >>> Caused by: org.apache.flink.runtime.JobException: Creating the input >>> splits caused an error: Java heap space >>> at >>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) >>> at >>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471) >>> at org.apache.flink.runtime.jobmanager.JobManager.org >>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:515) >>> ... 19 more >>> Caused by: java.lang.OutOfMemoryError: Java heap space >>> at java.util.Arrays.copyOf(Arrays.java:2219) >>> at java.util.ArrayList.grow(ArrayList.java:242) >>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>> at java.util.ArrayList.add(ArrayList.java:440) >>> at >>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:503) >>> at >>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51) >>> at >>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) >>> >>> Am I doing something wrong or is it a bug? >>> >>> Best, >>> Flavio >>> >> >> > >