Re: CATALYST rule join
Not fully understand your question, but maybe you want check out this JIRA https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments area. There are some discussion about the logic why UDF could be executed multi times by Spark. Yong From: tan shai Sent: Tuesday, February 27, 2018 4:19 AM To: user@spark.apache.org Subject: Re: CATALYST rule join Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you 2018-02-25 23:08 GMT+01:00 tan shai mailto:tan.shai...@gmail.com>>: Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you.
Re: CATALYST rule join
Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you 2018-02-25 23:08 GMT+01:00 tan shai : > Hi, > > I need to write a rule to customize the join function using Spark Catalyst > optimizer. The objective to duplicate the second dataset using this > process: > > - Execute a udf on the column called x, this udf returns an array > > - Execute an explode function on the new column > > Using SQL terms, my objective is to execute this query on the second table > : > > SELECT EXPLODE(foo(x)) from table2 > > Where `foo` is is a udf that return an array of elements. > > I have this rule: > > case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { > > override def apply(plan: LogicalPlan): LogicalPlan = plan transform { > > case join@Join(left, right, _, Some(condition)) => > > { > > val attr = right.outputSet.find(x => x.toString().contains("x")) > > val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), > Seq(attr.last.toAttribute)) > > val explode = Explode(udf) > > val resolvedGenerator = Generate(explode, true,false, qualifier = > None, udf.references.toSeq, right) > > var newRight = Project(resolvedGenerator.output,resolvedGenerator) > > Join(left, newRight , Inner,Option(condition)) > > } > } > } > > But the problem is that the operation `Generate explode` appears many > times in the physical plan. > > > Do you have any other ideas ? Maybe rewriting the code. > > Thank you. > >
CATALYST rule join
Hi, I need to write a rule to customize the join function using Spark Catalyst optimizer. The objective to duplicate the second dataset using this process: - Execute a udf on the column called x, this udf returns an array - Execute an explode function on the new column Using SQL terms, my objective is to execute this query on the second table : SELECT EXPLODE(foo(x)) from table2 Where `foo` is is a udf that return an array of elements. I have this rule: case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join@Join(left, right, _, Some(condition)) => { val attr = right.outputSet.find(x => x.toString().contains("x")) val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), Seq(attr.last.toAttribute)) val explode = Explode(udf) val resolvedGenerator = Generate(explode, true,false, qualifier = None, udf.references.toSeq, right) var newRight = Project(resolvedGenerator.output,resolvedGenerator) Join(left, newRight , Inner,Option(condition)) } } } But the problem is that the operation `Generate explode` appears many times in the physical plan. Do you have any other ideas ? Maybe rewriting the code. Thank you.