HeartSaVioR commented on pull request #31296:
URL: https://github.com/apache/spark/pull/31296#issuecomment-766276315


   I understand the functionality is lacking on SS. There's a workaround like 
foreachBatch -> toRDD -> pipe but streaming operations can't be added after 
calling pipe. So I'd agree that it'd be better to address the gap in any way.
   
   I feel default serialization logic on PipedRDD is also fragile and not well 
documented as well. (This actually makes me wondering, is PipedRDD widely 
adopted?) Is there any documentation/mention that T.toString is used as a 
serialization, and it doesn't escape line break so multiple lines string will 
be printed as multiple lines without any guard? The default implementation is 
too naive and even for primitive type it's not hard to find the hole. There's a 
parameter to customize the serialization and we can add it as well so it makes 
me less concerned, but default should be still reasonable and well explained 
for the limitations if any.
   
   Adding this to top level API would be easiest way to do as this simply 
leverages PipedRDD and the diff of this PR except tests is very small. This is 
a major upside.
   
   The major downside is that this only works if end users intend to send all 
columns as input and use the output as single string column. Suppose end users 
have 10 columns in their DataFrame and want to use pipe with only one column 
and also retain 9 columns for next operation (so 10 columns still remain 
including the output of pipe). How to do it?
   
   And like I said above I don't think they'll be able to understand the 
serialized form for the multiple columns or complicated column types. They'll 
end up using custom class for type T which overrides toString.
   
   Adding this to function for parameter type string or binary would require 
more work and it would force end users to provide the serialized form as input 
if they want to pass multiple columns or non primitive column. But at least 
they should know what they are doing on using the function, and there's no 
magic behind the curtain, so should have no issue on serialization. Some 
functions like `to_json` would help to do serialize easily. It's up to end 
users that which column(s) they will pass, and all columns are still retained.
   
   > UDF/expression not work because pipe is not 1-to-1 relation between input 
and output. For example piping through wc -l gets single output per partition.
   
   I'd feel more natural if the relation is 1-to-1 for normal case and N-to-1 
"across partitions" for aggregation (so the output will be `array[string]` 
unless merge function can be provided). It sounds a bit arbitrary to me if the 
forked process can decide whether it will aggregate or not.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to