ueshin commented on code in PR #42420:
URL: https://github.com/apache/spark/pull/42420#discussion_r1291940456
##########
python/pyspark/worker.py:
##########
@@ -621,22 +663,30 @@ def verify_result(result):
return lambda *a: map(lambda res: (res, arrow_return_type),
map(verify_result, f(*a)))
- eval = wrap_arrow_udtf(getattr(udtf, "eval"), return_type)
-
- if hasattr(udtf, "terminate"):
- terminate = wrap_arrow_udtf(getattr(udtf, "terminate"),
return_type)
- else:
- terminate = None
+ udtf_state.eval = wrap_arrow_udtf(getattr(udtf_state.udtf, "eval"),
return_type)
+ udtf_state.set_terminate(wrap_arrow_udtf, return_type)
def mapper(_, it):
try:
for a in it:
# The eval function yields an iterator. Each element
produced by this
# iterator is a tuple in the form of (pandas.DataFrame,
arrow_return_type).
- yield from eval(*[a[o] for o in arg_offsets])
+ arguments = [a[o] for o in arg_offsets]
+ changed_partitions = check_partition_boundaries(arguments)
+ if changed_partitions:
+ # Call 'terminate' on the UDTF class instance, if
applicable.
+ # Then destroy the UDTF class instance and create a
new one.
+ if udtf_state.terminate is not None:
+ yield from udtf_state.terminate()
+ create_udtf_classs_instance(return_type)
Review Comment:
Need to update here, too.
```suggestion
create_udtf_class_instance()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]