Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

    SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

    case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

    case join@Join(left, right, _, Some(condition)) =>

        {

    val attr = right.outputSet.find(x => x.toString().contains("x"))

    val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

    val explode = Explode(udf)

    val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

    var newRight = Project(resolvedGenerator.output,resolvedGenerator)

    Join(left, newRight , Inner,Option(condition))

        }
      }
    }

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.

Reply via email to