GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/16585

    [SPARK-19223][SQL][PySpark] Fix InputFileBlockHolder for datasources which 
are based on HadoopRDD or NewHadoopRDD

    ## What changes were proposed in this pull request?
    
    For some datasources which are based on HadoopRDD or NewHadoopRDD, such as 
spark-xml, InputFileBlockHolder doesn't work with Python UDF.
    
    The method to reproduce it is, running the following codes with 
`bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:
    
        from pyspark.sql.functions import udf,input_file_name
        from pyspark.sql.types import StringType
        from pyspark.sql import SparkSession
    
        def filename(path):
            return path
    
        session = SparkSession.builder.appName('APP').getOrCreate()
    
        session.udf.register('sameText',filename)
        sameText = udf(filename, StringType())
    
        df = session.read.format('xml').load('a.xml', 
rowTag='root').select('*',input_file_name().alias('file'))
        df.select('file').show() # works
        df.select(sameText(df['file'])).show()   # returns empty content
    
    The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file 
block's info in `InputFileBlockHolder` before the returned iterator begins 
consuming. `InputFileBlockHolder` will record this info into thread local 
variable. When running Python UDF in batch, we set up another thread to consume 
the iterator from child plan's output rdd, so we can't read the info back in 
another thread.
    
    To fix this, we have to set the info in `InputFileBlockHolder` after the 
iterator begins consuming. So the info can be read in correct thread.
    
    ## How was this patch tested?
    
    Manual test with above example codes for spark-xml package on pyspark: 
`bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 fix-inputfileblock-hadooprdd

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16585.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16585
    
----
commit 5fd215f7b6012c818f39402fff6cae8d19338b90
Author: Liang-Chi Hsieh <[email protected]>
Date:   2017-01-14T06:23:32Z

    Fix InputFileBlock for HadoopRDD.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to