[jira] [Resolved] (SPARK-26826) Array indexing functions array_allpositions and array_select
[ https://issues.apache.org/jira/browse/SPARK-26826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic resolved SPARK-26826. --- Resolution: Won't Fix > Array indexing functions array_allpositions and array_select > > > Key: SPARK-26826 > URL: https://issues.apache.org/jira/browse/SPARK-26826 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Petar Zecevic >Priority: Major > > This ticket proposes two extra array functions: {{array_allpositions}} (named > after {{array_position}}) and {{array_select}}. These functions should make > it easier to: > * get an array of indices of all occurences of a value in an array > ({{array_allpositions}}) > * select all elements of an array based on an array of indices > ({{array_select}}) > Although higher-order functions, such as {{aggregate}} and {{transform}}, > have been recently added, performing tasks above is still not simple, hence > this addition. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26826) Array indexing functions array_allpositions and array_select
[ https://issues.apache.org/jira/browse/SPARK-26826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-26826: -- Description: This ticket proposes two extra array functions: {{array_allpositions}} (named after {{array_position}}) and {{array_select}}. These functions should make it easier to: * get an array of indices of all occurences of a value in an array ({{array_allpositions}}) * select all elements of an array based on an array of indices ({{array_select}}) Although higher-order functions, such as {{aggregate}} and {{transform}}, have been recently added, performing tasks above is still not simple, hence this addition. was: This ticket proposes two extra array functions: `array_allpositions` (named after `array_position`) and `array_select`. These functions should make it easier to: * get an array of indices of all occurences of a value in an array (`array_allpositions`) * select all elements of an array based on an array of indices (`array_select`) Although higher-order functions, such as `aggregate` and `transform`, have been recently added, performing tasks above is still not simple, hence this addition. > Array indexing functions array_allpositions and array_select > > > Key: SPARK-26826 > URL: https://issues.apache.org/jira/browse/SPARK-26826 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.4.0 >Reporter: Petar Zecevic >Priority: Major > > This ticket proposes two extra array functions: {{array_allpositions}} (named > after {{array_position}}) and {{array_select}}. These functions should make > it easier to: > * get an array of indices of all occurences of a value in an array > ({{array_allpositions}}) > * select all elements of an array based on an array of indices > ({{array_select}}) > Although higher-order functions, such as {{aggregate}} and {{transform}}, > have been recently added, performing tasks above is still not simple, hence > this addition. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26826) Array indexing functions array_allpositions and array_select
Petar Zecevic created SPARK-26826: - Summary: Array indexing functions array_allpositions and array_select Key: SPARK-26826 URL: https://issues.apache.org/jira/browse/SPARK-26826 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Petar Zecevic This ticket proposes two extra array functions: `array_allpositions` (named after `array_position`) and `array_select`. These functions should make it easier to: * get an array of indices of all occurences of a value in an array (`array_allpositions`) * select all elements of an array based on an array of indices (`array_select`) Although higher-order functions, such as `aggregate` and `transform`, have been recently added, performing tasks above is still not simple, hence this addition. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-24020: -- Description: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible. h3. The solution implementation We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join". This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used). h3. The classes that need to be changed For implementing the described change we propose changes to these classes: * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc. * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. To make the change as less intrusive as possible, we propose to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_ * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of the extracted range conditions * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two code paths: ** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner) ** when whole-stage code generation is turned on (methods doProduce and genScanner) * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned off * _InnerJoinSuite_ – functional tests * _JoinBenchmark_ – performance tests h3. Triggering the optimization The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added. To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely. h2. Applicable use cases Potential use-cases for this are joins based on spatial or temporal distance calculations. was: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark
[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-24020: -- Description: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible. h3. The solution implementation We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join". This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used). h3. The classes that need to be changed For implementing the described change we propose changes to these classes: * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc. * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. To make the change as less intrusive as possible, we propose to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_ * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of the extracted range conditions * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two code paths: ** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner) ** when whole-stage code generation is turned on (methods doProduce and genScanner) * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned off * _InnerJoinSuite_ – functional tests * _JoinBenchmark_ – performance tests h3. Triggering the optimization The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added. To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely. h3. Applicable use cases Potential use-cases for this are joins based on spatial or temporal distance calculations. was: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark
[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-24020: -- Description: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible. h2. The solution implementation We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join". This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used). h3. The classes that need to be changed For implementing the described change we propose changes to these classes: * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc. * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. To make the change as less intrusive as possible, we propose to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_ * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of the extracted range conditions * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two code paths: ** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner) ** when whole-stage code generation is turned on (methods doProduce and genScanner) * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned off * _InnerJoinSuite_ – functional tests * _JoinBenchmark_ – performance tests h2. Triggering the optimization The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added. To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely. h2. Applicable use cases Potential use-cases for this are joins based on spatial or temporal distance calculations. was: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark
[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-24020: -- Description: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted within partitions by Y column and you need to calculate an expensive function on the joined rows, which reduces the number of output rows (e.g. condition based on a spatial distance calculation). But you could theoretically reduce the number of joined rows for which the calculation itself is performed by using a range condition on the Y column. Something like this: {{... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d AND }} However, during a sort-merge join with this range condition specified, Spark will first cross-join all the rows with the same X value and only then try to apply the range condition and any function calculations. This happens because, inside the generated sort-merge join (SMJ) code, these extra conditions are put in the same block with the function being calculated and there is no way to evaluate these conditions before reading all the rows to be checked into memory (into an {{ExternalAppendOnlyUnsafeRowArray}}). If the two tables have a large number of rows per X, this can result in a huge number of calculations and a huge number of rows in executor memory, which can be unfeasible. h2. The solution implementation We therefore propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a moving window through the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join; in literature it is sometimes called an “epsilon join”. We call it a "sort-merge inner range join". This design uses much less memory (not all rows with the same values of X need to be loaded into memory at once) and requires a much lower number of comparisons (the validity of this statement depends on the actual data and conditions used). h3. For implementing the described change we propose changes to these classes: * _ExtractEquiJoinKeys_ – a pattern that needs to be extended to be able to recognize the case where a simple range condition with lower and upper limits is used on a secondary column (a column not included in the equi-join condition). The pattern also needs to extract the information later required for code generation etc. * _InMemoryUnsafeRowQueue_ – the moving window implementation to be used instead of the _ExternalAppendOnlyUnsafeRowArray_ class. The rows need to be removed and added to/from the structure as the left key (X) changes, or the left secondary value (Y) changes, so the structure needs to be a queue. To make the change as less intrusive as possible, we propose to implement _InMemoryUnsafeRowQueue_ as a subclass of _ExternalAppendOnlyUnsafeRowArray_ * _JoinSelection_ – a strategy that uses _ExtractEquiJoinKeys_ and needs to be aware of the extracted range conditions * _SortMergeJoinExec_ – the main implementation of the optimization. Needs to support two code paths: ** when whole-stage code generation is turned off (method doExecute, which uses sortMergeJoinInnerRangeScanner) ** when whole-stage code generation is turned on (methods doProduce and genScanner) * _SortMergeJoinInnerRangeScanner_ – implements the SMJ with inner-range optimization in the case when whole-stage codegen is turned off * _InnerJoinSuite_ – functional tests * _JoinBenchmark_ – performance tests The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added. To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely. Potential use-cases for this are joins based on spatial or temporal distance calculations. was: The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted by Y column (within partitions) and you need to calculate an expensive function on the joined rows. During a sort-merge join, Spark will do cross-joins of all rows that have the same X values and calculate the function's value on all of them. If the two tables have a large number of rows per X, this can result in a huge number of calculations. We hereby propose an optimization that would allow you to reduce the number of matching rows per X using a range condition on Y columns of the two tables. Something like: ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d The way SMJ is currently imp
[jira] [Updated] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Petar Zecevic updated SPARK-24020: -- Attachment: SMJ-innerRange-PR24020-designDoc.pdf > Sort-merge join inner range optimization > > > Key: SPARK-24020 > URL: https://issues.apache.org/jira/browse/SPARK-24020 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Petar Zecevic >Priority: Major > Attachments: SMJ-innerRange-PR24020-designDoc.pdf > > > The problem we are solving is the case where you have two big tables > partitioned by X column, but also sorted by Y column (within partitions) and > you need to calculate an expensive function on the joined rows. During a > sort-merge join, Spark will do cross-joins of all rows that have the same X > values and calculate the function's value on all of them. If the two tables > have a large number of rows per X, this can result in a huge number of > calculations. > We hereby propose an optimization that would allow you to reduce the number > of matching rows per X using a range condition on Y columns of the two > tables. Something like: > ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d > The way SMJ is currently implemented, these extra conditions have no > influence on the number of rows (per X) being checked because these extra > conditions are put in the same block with the function being calculated. > Here we propose a change to the sort-merge join so that, when these extra > conditions are specified, a queue is used instead of the > ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a > moving window across the values from the right relation as the left row > changes. You could call this a combination of an equi-join and a theta join > (we call it "sort-merge inner range join"). > Potential use-cases for this are joins based on spatial or temporal distance > calculations. > The optimization should be triggered automatically when an equi-join > expression is present AND lower and upper range conditions on a secondary > column are specified. If the tables aren't sorted by both columns, > appropriate sorts should be added. > To limit the impact of this change we also propose adding a new parameter > (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which > could be used to switch off the optimization entirely. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24020) Sort-merge join inner range optimization
[ https://issues.apache.org/jira/browse/SPARK-24020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444335#comment-16444335 ] Petar Zecevic commented on SPARK-24020: --- No, this implementation only applies to equi-joins that have range conditions on different columns. You can think of it as an equi-join with "sub-band" conditions. Hence the name we gave it ("sort-merge inner range join"). > Sort-merge join inner range optimization > > > Key: SPARK-24020 > URL: https://issues.apache.org/jira/browse/SPARK-24020 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Petar Zecevic >Priority: Major > > The problem we are solving is the case where you have two big tables > partitioned by X column, but also sorted by Y column (within partitions) and > you need to calculate an expensive function on the joined rows. During a > sort-merge join, Spark will do cross-joins of all rows that have the same X > values and calculate the function's value on all of them. If the two tables > have a large number of rows per X, this can result in a huge number of > calculations. > We hereby propose an optimization that would allow you to reduce the number > of matching rows per X using a range condition on Y columns of the two > tables. Something like: > ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d > The way SMJ is currently implemented, these extra conditions have no > influence on the number of rows (per X) being checked because these extra > conditions are put in the same block with the function being calculated. > Here we propose a change to the sort-merge join so that, when these extra > conditions are specified, a queue is used instead of the > ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a > moving window across the values from the right relation as the left row > changes. You could call this a combination of an equi-join and a theta join > (we call it "sort-merge inner range join"). > Potential use-cases for this are joins based on spatial or temporal distance > calculations. > The optimization should be triggered automatically when an equi-join > expression is present AND lower and upper range conditions on a secondary > column are specified. If the tables aren't sorted by both columns, > appropriate sorts should be added. > To limit the impact of this change we also propose adding a new parameter > (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which > could be used to switch off the optimization entirely. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24020) Sort-merge join inner range optimization
Petar Zecevic created SPARK-24020: - Summary: Sort-merge join inner range optimization Key: SPARK-24020 URL: https://issues.apache.org/jira/browse/SPARK-24020 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Petar Zecevic The problem we are solving is the case where you have two big tables partitioned by X column, but also sorted by Y column (within partitions) and you need to calculate an expensive function on the joined rows. During a sort-merge join, Spark will do cross-joins of all rows that have the same X values and calculate the function's value on all of them. If the two tables have a large number of rows per X, this can result in a huge number of calculations. We hereby propose an optimization that would allow you to reduce the number of matching rows per X using a range condition on Y columns of the two tables. Something like: ... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d The way SMJ is currently implemented, these extra conditions have no influence on the number of rows (per X) being checked because these extra conditions are put in the same block with the function being calculated. Here we propose a change to the sort-merge join so that, when these extra conditions are specified, a queue is used instead of the ExternalAppendOnlyUnsafeRowArray class. This queue would then used as a moving window across the values from the right relation as the left row changes. You could call this a combination of an equi-join and a theta join (we call it "sort-merge inner range join"). Potential use-cases for this are joins based on spatial or temporal distance calculations. The optimization should be triggered automatically when an equi-join expression is present AND lower and upper range conditions on a secondary column are specified. If the tables aren't sorted by both columns, appropriate sorts should be added. To limit the impact of this change we also propose adding a new parameter (tentatively named "spark.sql.join.smj.useInnerRangeOptimization") which could be used to switch off the optimization entirely. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13313) Strongly connected components doesn't find all strongly connected components
[ https://issues.apache.org/jira/browse/SPARK-13313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15200028#comment-15200028 ] Petar Zecevic commented on SPARK-13313: --- Ok, thanks for reporting. I'll look into this. > Strongly connected components doesn't find all strongly connected components > > > Key: SPARK-13313 > URL: https://issues.apache.org/jira/browse/SPARK-13313 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 >Reporter: Petar Zecevic > > Strongly connected components algorithm doesn't find all strongly connected > components. I was using Wikispeedia dataset > (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 > SCCs and one of them had 4051 vertices, which in reality don't have any edges > between them. > I think the problem could be on line 89 of StronglyConnectedComponents.scala > file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe > the second Pregel call should use Out edge direction, the same as the first > call because the direction is reversed in the provided sendMsg function > (message is sent to source vertex and not destination vertex). > If that is changed (line 89), the algorithm starts finding much more SCCs, > but eventually stack overflow exception occurs. I believe graph objects that > are changed through iterations should not be cached, but checkpointed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13313) Strongly connected components doesn't find all strongly connected components
[ https://issues.apache.org/jira/browse/SPARK-13313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15147007#comment-15147007 ] Petar Zecevic commented on SPARK-13313: --- No, I don't think it's got anything to do with that. That largest SCC's vertices are not connected in any way and they shouldn't be in the same group. > Strongly connected components doesn't find all strongly connected components > > > Key: SPARK-13313 > URL: https://issues.apache.org/jira/browse/SPARK-13313 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 >Reporter: Petar Zecevic > > Strongly connected components algorithm doesn't find all strongly connected > components. I was using Wikispeedia dataset > (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 > SCCs and one of them had 4051 vertices, which in reality don't have any edges > between them. > I think the problem could be on line 89 of StronglyConnectedComponents.scala > file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe > the second Pregel call should use Out edge direction, the same as the first > call because the direction is reversed in the provided sendMsg function > (message is sent to source vertex and not destination vertex). > If that is changed (line 89), the algorithm starts finding much more SCCs, > but eventually stack overflow exception occurs. I believe graph objects that > are changed through iterations should not be cached, but checkpointed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13313) Strongly connected components doesn't find all strongly connected components
[ https://issues.apache.org/jira/browse/SPARK-13313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15146731#comment-15146731 ] Petar Zecevic commented on SPARK-13313: --- Yes, you need articles.tsv and links.tsv from this archive: http://snap.stanford.edu/data/wikispeedia/wikispeedia_paths-and-graph.tar.gz Then parse the data, assign IDs to article names and create the graph: val articles = sc.textFile("articles.tsv", 6).filter(line => line.trim() != "" && !line.startsWith("#")).zipWithIndex().cache() val links = sc.textFile("links.tsv", 6).filter(line => line.trim() != "" && !line.startsWith("#")) val linkIndexes = links.map(x => { val spl = x.split("\t"); (spl(0), spl(1)) }).join(articles).map(x => x._2).join(articles).map(x => x._2) val wikigraph = Graph.fromEdgeTuples(linkIndexes, 0) Then get strongly connected components: val wikiSCC = wikigraph.stronglyConnectedComponents(100) wikiSCC graph contains 519 SCCs, but there should be much more. The largest SCC in wikiSCC has 4051 vertices and that's obviously wrong. The change in line 89, which I mentioned, seems to solve this problem, but then other issues arise (stack overflow etc) and I don't have time to investigate further. I hope someone will look into this. > Strongly connected components doesn't find all strongly connected components > > > Key: SPARK-13313 > URL: https://issues.apache.org/jira/browse/SPARK-13313 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 1.6.0 >Reporter: Petar Zecevic > > Strongly connected components algorithm doesn't find all strongly connected > components. I was using Wikispeedia dataset > (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 > SCCs and one of them had 4051 vertices, which in reality don't have any edges > between them. > I think the problem could be on line 89 of StronglyConnectedComponents.scala > file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe > the second Pregel call should use Out edge direction, the same as the first > call because the direction is reversed in the provided sendMsg function > (message is sent to source vertex and not destination vertex). > If that is changed (line 89), the algorithm starts finding much more SCCs, > but eventually stack overflow exception occurs. I believe graph objects that > are changed through iterations should not be cached, but checkpointed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13313) Strongly connected components doesn't find all strongly connected components
Petar Zecevic created SPARK-13313: - Summary: Strongly connected components doesn't find all strongly connected components Key: SPARK-13313 URL: https://issues.apache.org/jira/browse/SPARK-13313 Project: Spark Issue Type: Bug Components: GraphX Affects Versions: 1.6.0 Reporter: Petar Zecevic Strongly connected components algorithm doesn't find all strongly connected components. I was using Wikispeedia dataset (http://snap.stanford.edu/data/wikispeedia.html) and the algorithm found 519 SCCs and one of them had 4051 vertices, which in reality don't have any edges between them. I think the problem could be on line 89 of StronglyConnectedComponents.scala file where EdgeDirection.In should be changed to EdgeDirection.Out. I believe the second Pregel call should use Out edge direction, the same as the first call because the direction is reversed in the provided sendMsg function (message is sent to source vertex and not destination vertex). If that is changed (line 89), the algorithm starts finding much more SCCs, but eventually stack overflow exception occurs. I believe graph objects that are changed through iterations should not be cached, but checkpointed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6646) Spark 2.0: Rearchitecting Spark for Mobile Platforms
[ https://issues.apache.org/jira/browse/SPARK-6646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14390206#comment-14390206 ] Petar Zecevic commented on SPARK-6646: -- Good one :) > Spark 2.0: Rearchitecting Spark for Mobile Platforms > > > Key: SPARK-6646 > URL: https://issues.apache.org/jira/browse/SPARK-6646 > Project: Spark > Issue Type: Improvement > Components: Project Infra >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Blocker > Attachments: Spark on Mobile - Design Doc - v1.pdf > > > Mobile computing is quickly rising to dominance, and by the end of 2017, it > is estimated that 90% of CPU cycles will be devoted to mobile hardware. > Spark’s project goal can be accomplished only when Spark runs efficiently for > the growing population of mobile users. > Designed and optimized for modern data centers and Big Data applications, > Spark is unfortunately not a good fit for mobile computing today. In the past > few months, we have been prototyping the feasibility of a mobile-first Spark > architecture, and today we would like to share with you our findings. This > ticket outlines the technical design of Spark’s mobile support, and shares > results from several early prototypes. > Mobile friendly version of the design doc: > https://databricks.com/blog/2015/04/01/spark-2-rearchitecting-spark-for-mobile.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org