GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/14987

    [SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to 
save file names in FileStreamSource

    ## What changes were proposed in this pull request?
    
    When we create a filestream on a directory that has partitioned subdirs 
(i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir 
as Seq[String] which internally is a Stream[String]. This is because of this 
[line|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93],
 where a LinkedHashSet.values.toSeq returns Stream. Then when the 
[FileStreamSource|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79]
 filters this Stream[String] to remove the seen files, it creates a new 
Stream[String], which has a filter function that has a $outer reference to the 
FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] 
causes NotSerializableException. This will happened even if there is just one 
file in the dir.
    
    Its important to note that this behavior is different in Scala 2.11. There 
is no $outer reference to FileStreamSource, so it does not throw 
NotSerializableException. However, with a large sequence of files (tested with 
10000 files), it throws StackOverflowError. This is because how Stream class is 
implemented. Its basically like a linked list, and attempting to serialize a 
long Stream requires *recursively* going through linked list, thus resulting in 
StackOverflowError.
    
    The right solution is to convert the seq to an array before writing to the 
log. This PR implements this fix in two ways.
    - Changing all uses for HDFSMetadataLog to ensure Array is used instead of 
Seq
    - Added a `require` in HDFSMetadataLog such that it is never used with type 
Seq
    
    ## How was this patch tested?
    
    Added unit test that test that ensures the file stream source can handle 
with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different 
failures as indicated above.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-17372

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14987
    
----
commit 5e7c127d7a79570f4d210f8eaf239bef73a54e24
Author: Tathagata Das <[email protected]>
Date:   2016-09-06T21:57:12Z

    SPARK-17372

commit 9bcbb087d2935657a30eb9bc6b52ea6fbed65edf
Author: Tathagata Das <[email protected]>
Date:   2016-09-07T00:22:59Z

    Improve unit test to run faster

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to