Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process:
- Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you.