Hello,
I find myself in need of being able to process a large number of files (28M)
stored in a deeply nested folder hierarchy (Pairtree... a multi-level
hashtable-on-disk -like structure). Here's an example path:
./udel/pairtree_root/31/74/11/11/56/89/39/31741111568939/31741111568939.zip
I can't scan the entire folder structure ahead of time to build a list of
files, and then use sc.parallelize(list, ...) to create and RDD to process,
because traversing the entire folder structure would take a very long time.
(also, the folder content is fluid, meaning that files are added and deleted
periodically) I'm thinking I would need to use Spark Streaming whereby I use
something like java.nio.Files.walkFileTree(...) to "discover" these files and
"stream" them into Spark as they're found.
What do the experts think? Is there a better way of handling this?
The unit of parallelization is a single file (i.e. the "processing" operates at
a file level).
Here's what I've created so far:
class DirectoryExplorerReceiver(dir: String, accept: (String,
BasicFileAttributes) => Boolean)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
val context = scala.concurrent.ExecutionContext.Implicits.global
val rootPath = FileSystems.getDefault.getPath(dir)
context.execute(new Runnable {
override def run(): Unit =
Files.walkFileTree(rootPath, new SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes):
FileVisitResult = {
if (isStopped())
return FileVisitResult.TERMINATE
if (accept(file.toString, attrs))
store(file.toFile.getPath)
FileVisitResult.CONTINUE
}
})
})
}
override def onStop(): Unit = {}
}
And use it like this:
val ssc = new StreamingContext(conf, Seconds(10))
val zipFiles = ssc.receiverStream(
new DirectoryExplorerReceiver(
dataDir,
(filePath, attrs) =>
attrs.isRegularFile &&
filePath.toLowerCase.endsWith(".zip") &&
attrs.size() > 0
)
)
val processedData = zipFiles.map(zipFile => doSomethingUseful(zipFile))
This basically uses a 10 second window to "stream" all files discovered during
that window.
Thanks for any comments/suggestions.
-Boris