GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/14987
[SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to
save file names in FileStreamSource
## What changes were proposed in this pull request?
When we create a filestream on a directory that has partitioned subdirs
(i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir
as Seq[String] which internally is a Stream[String]. This is because of this
[line|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93],
where a LinkedHashSet.values.toSeq returns Stream. Then when the
[FileStreamSource|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79]
filters this Stream[String] to remove the seen files, it creates a new
Stream[String], which has a filter function that has a $outer reference to the
FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String]
causes NotSerializableException. This will happened even if there is just one
file in the dir.
Its important to note that this behavior is different in Scala 2.11. There
is no $outer reference to FileStreamSource, so it does not throw
NotSerializableException. However, with a large sequence of files (tested with
10000 files), it throws StackOverflowError. This is because how Stream class is
implemented. Its basically like a linked list, and attempting to serialize a
long Stream requires *recursively* going through linked list, thus resulting in
StackOverflowError.
The right solution is to convert the seq to an array before writing to the
log. This PR implements this fix in two ways.
- Changing all uses for HDFSMetadataLog to ensure Array is used instead of
Seq
- Added a `require` in HDFSMetadataLog such that it is never used with type
Seq
## How was this patch tested?
Added unit test that test that ensures the file stream source can handle
with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different
failures as indicated above.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-17372
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14987.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14987
----
commit 5e7c127d7a79570f4d210f8eaf239bef73a54e24
Author: Tathagata Das <[email protected]>
Date: 2016-09-06T21:57:12Z
SPARK-17372
commit 9bcbb087d2935657a30eb9bc6b52ea6fbed65edf
Author: Tathagata Das <[email protected]>
Date: 2016-09-07T00:22:59Z
Improve unit test to run faster
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]