HyukjinKwon commented on issue #24946: [SPARK-27234][SS][PYTHON] Use 
InheritableThreadLocal for current epoch in EpochTracker (to support Python 
UDFs)
URL: https://github.com/apache/spark/pull/24946#issuecomment-505227175
 
 
   It works after this PR as below:
   
   ```python
   from pyspark.sql.functions import col, pandas_udf, PandasUDFType
   
   @pandas_udf("int", PandasUDFType.SCALAR_ITER)
   def the_udf(iterator):
       for col1_batch in iterator:
           yield col1_batch
   
   spark \
       .readStream \
       .format("rate") \
       .load() \
       .withColumn("foo", the_udf(col("value"))) \
       .writeStream \
       .format("console") \
       .trigger(continuous="5 second").start() 
   ```
   
   
   Before:
   
   ```
   ...
   Caused by: java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:366)
        at scala.None$.get(Option.scala:364)
        at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader.next(ContinuousQueuedDataReader.scala:116)
        at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:93)
        at 
org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD$$anon$1.getNext(ContinuousDataSourceRDD.scala:91)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:701)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
   ...
   ```
   
   After:
   
   ```
   ...
   -------------------------------------------
   Batch: 3
   -------------------------------------------
   +--------------------+-----+---+
   |           timestamp|value|foo|
   +--------------------+-----+---+
   |2019-06-25 09:05:...|    9|  9|
   |2019-06-25 09:05:...|    2|  2|
   |2019-06-25 09:05:...|   10| 10|
   |2019-06-25 09:05:...|   11| 11|
   |2019-06-25 09:05:...|    5|  5|
   |2019-06-25 09:05:...|    3|  3|
   |2019-06-25 09:05:...|    1|  1|
   |2019-06-25 09:05:...|    6|  6|
   |2019-06-25 09:05:...|    4|  4|
   |2019-06-25 09:05:...|    8|  8|
   |2019-06-25 09:05:...|    7|  7|
   |2019-06-25 09:05:...|    0|  0|
   +--------------------+-----+---+
   ...
   ```
   
   Because each epoch couldn't be referred in writer thread (to Python 
process). Each UDF will be executed each execution per each epoch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to