grundprinzip opened a new pull request, #39291:
URL: https://github.com/apache/spark/pull/39291

   ### What changes were proposed in this pull request?
   
   This PR adds an extension mechanism to the Spark Connect protocol to support 
custom Relation and Expression types. This is necessary to support custom 
extensions in Spark like Delta or custom plugins in Catalyst.
   
   This is achieved by adding `protobuf.Any` fields in both `Relation` and 
`Expression`. To load the extension, this PR adds two configuration flags to 
indicate which classes should be loaded.
   
     * `spark.connect.extensions.relation.classes`
     * `spark.connect.extensions.expression.classes`
   
   To add a new plugin, the consumers have to implement either `RelationPlugin` 
or `ExpressionPlugin` and implement the corresponding `transform()` method. If 
the plugin does not support the transformation of the input, they plugin must 
return `None`.
   
   Below is a simplified example of an expression and relation plugin.
   
   First, define the custom message type that are necessary for the particular 
input.
   
   ```
   message ExamplePluginRelation {
     Relation input = 1;
     string custom_field = 2;
   
   }
   
   message ExamplePluginExpression {
     Expression child = 1;
     string custom_field = 2;
   }
   ```
   
   Second, define the necessary `RelationPlugin` and `ExpressionPlugin` 
implementations.
   
   ```
   class ExampleRelationPlugin extends RelationPlugin {
     override def transform(
         relation: protobuf.Any,
         planner: SparkConnectPlanner): Option[LogicalPlan] = {
   
       if (!relation.is(classOf[proto.ExamplePluginRelation])) {
         return None
       }
       val plugin = relation.unpack(classOf[proto.ExamplePluginRelation])
       Some(planner.transformRelation(plugin.getInput))
     }
   }
   
   class ExampleExpressionPlugin extends ExpressionPlugin {
     override def transform(
         relation: protobuf.Any,
         planner: SparkConnectPlanner): Option[Expression] = {
       if (!relation.is(classOf[proto.ExamplePluginExpression])) {
         return None
       }
       val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
       Some(
         Alias(planner.transformExpression(exp.getChild), 
exp.getCustomField)(explicitMetadata =
           None))
     }
   }
   ```
   
   Now, on the client side, the new extensions simply have to be encoded into 
the `protobuf.Any` value to be available once the the plugins are loaded. Below 
is an example for wrapping the custom message type into a standard `Relation` 
with a `Range` child.
   
   ```
   Relation
     .newBuilder()
     .setExtension(
       protobuf.Any.pack(
         proto.ExamplePluginRelation
           .newBuilder()
           .setInput(
             proto.Relation
               .newBuilder()
               .setRange(proto.Range
                 .newBuilder()
                 .setStart(0)
                 .setEnd(10)
                 .setStep(1)))
           .build()))
   ```
   
   When the plan is transformed the custom extensions will behave like any 
other built-in functionality of Spark.
   
   
   ### Why are the changes needed?
   Extensibility
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, adds a new extension mechanism to the Spark Connect protoocol.
   
   
   ### How was this patch tested?
   Added test coverage for the plugin registry, and end to end transformation 
and execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to