[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138856#comment-16138856 ] Evan Chan commented on SPARK-12449: --- Andrew and others: Is there a plan to make this CatalystSource available or contribute it back to Spark somehow? > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Chan updated SPARK-13219: -- Hi Gagan, That is an interesting optimization but not the same one that Venu speaks of (I worked on those optimizations). Basically those optimizations are for where the column name in the WHERE clause are present in both tables, and my impression is this is what this fix is for as well. Your case would be very useful too. You can do it in two steps though, first do the lookup of postal codes from location, then translate your select from address into an IN condition. Of course it’s better if Spark does this so that the results don’t have to be passed back through the driver. > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-8133) sticky partitions
[ https://issues.apache.org/jira/browse/SPARK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Chan reopened SPARK-8133: -- I think this is worth looking into again - for streaming. My team is creating spark streaming pipelines that do aggregations. For correctness and efficiency, if we can maintain a cache of current aggregation values across micro batches, then we can lower the load on datastores and improve performance - without making the batch size too big (which leads to other problems). Using Kafka to partition does not solve this problem because we need to do groupBys, sorts etc on the incoming stream, so in particular we want the sorted output to be "sticky" to a particular node. Maintaining a cache or in-memory state requires "stickiness" of partitions to nodes. We are exploring two avenues to do this and can contribute it back. 1) By modifying the TaskSchedulerImpl, we can avoid shuffles of tasks when allocating executors/workers. This solves stickiness for clusters where the number of executors will not change. 2) Using a custom ShuffledRDD (or derived class) which can place the shuffled data partition on the same node given the same range of keys (assume HashPartitioner with constant number of partitions). > sticky partitions > - > > Key: SPARK-8133 > URL: https://issues.apache.org/jira/browse/SPARK-8133 > Project: Spark > Issue Type: New Feature > Components: DStreams >Affects Versions: 1.3.1 >Reporter: sid > > We are trying to replace Apache Storm with Apache Spark streaming. > In storm; we partitioned stream based on "Customer ID" so that msgs with a > range of "customer IDs" will be routed to same bolt (worker). > We do this because each worker will cache customer details (from DB). > So we split into 4 partitions and each bolt (worker) will have 1/4 of the > entire range. > I am hoping we have a solution to this in Spark Streaming -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592429#comment-15592429 ] Evan Chan commented on SPARK-15687: --- [~kiszk] thanks for the PR... would you mind pointing me to the ColumnarBatch Trait/API?I'd like to review that piece of it, but the code review is really really really long :)Thanks > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376418#comment-15376418 ] Evan Chan edited comment on SPARK-15687 at 7/14/16 6:41 AM: [~rxin] I like the idea of an Iterator[ColumnBatch]. I would highly suggest that we make the ColumnBatch and ColumnVector an interface/trait, so that we can support different implementations. This can encapsulate the ColumnarBatch etc. used in Parquet reader, but also allow in the future other implementations and columnar sources to take advantage. This trait would offer common methods, such as def numRows(): Int def atRow(i: Int): A def definedAt(i: Int): Boolean etc. I definitely would not want to create my own version of spark just to work with my own columnar format, which is what I have to do right now :-p was (Author: velvia): [~rxin] I like the idea of an Iterator[ColumnBatch]. I would highly suggest that we make the ColumnBatch and ColumnVector an interface/trait, so that we can support different implementations. This can encapsulate the ColumnarBatch etc. used in Parquet reader, but also allow in the future other implementations and columnar sources to take advantage. This trait would offer common methods, such as def numRows(): Int def atRow(i: Int): A def definedAt(i: Int): Boolean etc. > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376418#comment-15376418 ] Evan Chan commented on SPARK-15687: --- [~rxin] I like the idea of an Iterator[ColumnBatch]. I would highly suggest that we make the ColumnBatch and ColumnVector an interface/trait, so that we can support different implementations. This can encapsulate the ColumnarBatch etc. used in Parquet reader, but also allow in the future other implementations and columnar sources to take advantage. This trait would offer common methods, such as def numRows(): Int def atRow(i: Int): A def definedAt(i: Int): Boolean etc. > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13662) [SQL][Hive] Have SHOW TABLES return additional fields from Hive MetaStore
[ https://issues.apache.org/jira/browse/SPARK-13662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245129#comment-15245129 ] Evan Chan commented on SPARK-13662: --- Vijay, That would be awesome! Please go ahead. > [SQL][Hive] Have SHOW TABLES return additional fields from Hive MetaStore > -- > > Key: SPARK-13662 > URL: https://issues.apache.org/jira/browse/SPARK-13662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.5.2, 1.6.0 > Environment: All >Reporter: Evan Chan > > Currently, the SHOW TABLES command in Spark's Hive ThriftServer, or > equivalently the HiveContext.tables method, returns a DataFrame with only two > columns: the name of the table and whether it is temporary. It would be > really nice to add support to return some extra information, such as: > - Whether this table is Spark-only or a native Hive table > - If spark-only, the name of the data source > - potentially other properties > The first two is really useful for BI environments connecting to multiple > data sources and that work with both Hive and Spark. > Some thoughts: > - The SQL/HiveContext Catalog API might need to be expanded to return > something like a TableEntry, rather than just a tuple of (name, temporary). > - I believe there is a Hive Catalog/client API to get information about each > table. I suppose one concern would be the speed of using this API. Perhaps > there are other APis that can get this info faster. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13662) [SQL][Hive] Have SHOW TABLES return additional fields from Hive MetaStore
Evan Chan created SPARK-13662: - Summary: [SQL][Hive] Have SHOW TABLES return additional fields from Hive MetaStore Key: SPARK-13662 URL: https://issues.apache.org/jira/browse/SPARK-13662 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.6.0, 1.5.2 Environment: All Reporter: Evan Chan Currently, the SHOW TABLES command in Spark's Hive ThriftServer, or equivalently the HiveContext.tables method, returns a DataFrame with only two columns: the name of the table and whether it is temporary. It would be really nice to add support to return some extra information, such as: - Whether this table is Spark-only or a native Hive table - If spark-only, the name of the data source - potentially other properties The first two is really useful for BI environments connecting to multiple data sources and that work with both Hive and Spark. Some thoughts: - The SQL/HiveContext Catalog API might need to be expanded to return something like a TableEntry, rather than just a tuple of (name, temporary). - I believe there is a Hive Catalog/client API to get information about each table. I suppose one concern would be the speed of using this API. Perhaps there are other APis that can get this info faster. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160340#comment-15160340 ] Evan Chan commented on SPARK-13219: --- [~smilegator] [~doodlegum] Guys, let me explain the strategy that we used to fix the join transitivity, which I believe is much more general and helps many more cases than the approach in PR 10490. - First, we find out all of the join columns and all the tables that are joined for each join column. - Next, we discover all the predicates (currently equals and IN, could be more) that filter on those join columns. - We compute the joined tables for each join column which are missing the predicates - We replicate the filter expression (using AND) for each missing table in the previous step. The result is that no matter the number of tables and join columns, the predicates are augmented such that = and IN on literals are pushed to all the joined tables. Only thing is the current code works on unanalyzed logical plans, so we need to port it to work on analyzed logical plans instead. > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15160333#comment-15160333 ] Evan Chan commented on SPARK-13219: --- Sorry, could you explain how SPARK-12957 affects this one? > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157292#comment-15157292 ] Evan Chan commented on SPARK-13219: --- [~smilegator] [~doodlegum] what is the URL to the latest patch? Would like to contribute our code to this. > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151823#comment-15151823 ] Evan Chan commented on SPARK-12449: --- I think in the case of sources.Expressions, by the time they are pushed down, all aliases etc should have been resolved already, so that should not be an issue, right? Agree that capabilities would be important. If that didn’t exist, then the default would be to not compute the expressions and let Spark’s default aggregators do it, which means it would be like the filtering today where there is double filtering. > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151345#comment-15151345 ] Evan Chan commented on SPARK-12449: --- [~stephank85] would you have any code to share? :D > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151268#comment-15151268 ] Evan Chan commented on SPARK-12449: --- I agree with [~maxseiden] on a gradual approach to push more down into the data sources API.Since I was going to explore a path like this anyways, I'd be willing to submit a PR to explore a `sources.Expression` kind of pushdown. There is also some stuff in 2.0 that might interact with this, such as vectorization and the whole query code gen, that we need to be aware of. > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15141243#comment-15141243 ] Evan Chan commented on SPARK-12449: --- [~rxin] I agree with [~stephank85] and others that this would be a huge help. At the very least, if the expressions could be pushed down that would help a lot. Many databases are doing custom work to get the pushdowns needed, and I was thinking of doing something very similar and was going to propose something just like this. > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15138024#comment-15138024 ] Evan Chan commented on SPARK-13219: --- [~smilegator] does your PR take care of the case where no JOIN clause is invoked? does it also take care of multiple join conditions? (e.g., select from a a, b b, c c where a.col1 = b.col1 && b.col1 = c.col1 && ) > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12639) Improve Explain for DataSources with Handled Predicate Pushdowns
[ https://issues.apache.org/jira/browse/SPARK-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15088017#comment-15088017 ] Evan Chan commented on SPARK-12639: --- +1 > Improve Explain for DataSources with Handled Predicate Pushdowns > > > Key: SPARK-12639 > URL: https://issues.apache.org/jira/browse/SPARK-12639 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0 >Reporter: Russell Alexander Spitzer >Priority: Minor > > SPARK-11661 improves handling of predicate pushdowns but has an unintended > consequence of making the explain string more confusing. > It basically makes it seem as if a source is always pushing down all of the > filters (even those it cannot handle) > This can have a confusing effect (I kept checking my code to see where I had > broken something ) > {code: title= "Query plan for source where nothing is handled by C* Source"} > Filter a#71 = 1) && (b#72 = 2)) && (c#73 = 1)) && (e#75 = 1)) > +- Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@4b9cf75c[a#71,b#72,c#73,d#74,e#75,f#76,g#77,h#78] > PushedFilters: [EqualTo(a,1), EqualTo(b,2), EqualTo(c,1), EqualTo(e,1)] > {code} > Although the tell tale "Filter" step is present my first instinct would tell > me that the underlying source relation is using all of those filters. > {code: title = "Query plan for source where everything is handled by C* > Source"} > Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@55d4456c[a#79,b#80,c#81,d#82,e#83,f#84,g#85,h#86] > PushedFilters: [EqualTo(a,1), EqualTo(b,2), EqualTo(c,1), EqualTo(e,1)] > {code} > I think this would be much clearer if we changed the metadata key to > "HandledFilters" and only listed those handled fully by the underlying source. > Something like > {code: title="Proposed Explain for Pushdown were none of the predicates are > handled by the underlying source"} > Filter a#71 = 1) && (b#72 = 2)) && (c#73 = 1)) && (e#75 = 1)) > +- Scan > org.apache.spark.sql.cassandra.CassandraSourceRelation@4b9cf75c[a#71,b#72,c#73,d#74,e#75,f#76,g#77,h#78] > HandledFilters: [] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11838) Spark SQL query fragment RDD reuse
[ https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15086649#comment-15086649 ] Evan Chan commented on SPARK-11838: --- Based on everything that is said, it seems instead of having SparkPlan compute an RDD[InternalRow], for fragments detected to have been "cached" it would have an alternate SparkPlan that can just return the previously computed RDD[InternalRow] (or UnsafeRow)? Even if substitution into the LogicalPlan was done, it seems you would need some Strategy to parse the different operators generated by the substitution (i.e., for in memory cache, there needs to be the cacheManager substitution, then the InMemoryStrategy which returns an InMemoryColumnarScan. Actually this could be optimized, i think, to just substitution of an InMemoryDataSource by the cacheManager, so that a strategy is not needed) > Spark SQL query fragment RDD reuse > -- > > Key: SPARK-11838 > URL: https://issues.apache.org/jira/browse/SPARK-11838 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Mikhail Bautin > > With many analytical Spark SQL workloads against slowly changing tables, > successive queries frequently share fragments that produce the same result. > Instead of re-computing those fragments for every query, it makes sense to > detect similar fragments and substitute RDDs previously created for matching > SparkPlan fragments into every new SparkPlan at the execution time whenever > possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory, > many stages can still be skipped due to map output files being present on > executor nodes. > The implementation involves the following steps: > (1) Logical plan "canonicalization". > Logical plans mapping to the same "canonical" logical plan should always > produce the same results (except for possible output column reordering), > although the inverse statement won't always be true. > - Re-mapping expression ids to "canonical expression ids" (successively > increasing numbers always starting with 1). > - Eliminating alias names that are unimportant after analysis completion. > Only the names that are necessary to determine the Hive table columns to be > scanned are retained. > - Reordering columns in projections, grouping/aggregation expressions, etc. > This can be done e.g. by using the string representation as a sort key. Union > inputs always have to be reordered the same way. > - Tree traversal has to happen starting from leaves and progressing towards > the root, because we need to already have identified canonical expression ids > for children of a node before we can come up with sort keys that would allow > to reorder expressions in a node deterministically. This is a bit more > complicated for Union nodes. > - Special handling for MetastoreRelations. We replace MetastoreRelation > with a special class CanonicalMetastoreRelation that uses attributes and > partitionKeys as part of its equals() and hashCode() implementation, but the > visible attributes and aprtitionKeys are restricted to expression ids that > the rest of the query actually needs from that MetastoreRelation. > An example of logical plans and corresponding canonical logical plans: > https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw > (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When > generating a SparkPlan, we keep an optional reference to a LogicalPlan > instance in every node. This allows us to populate the cache with mappings > from canonical logical plans of query fragments to the corresponding RDDs > generated as part of query execution. Note that there is no new work > necessary to generate the RDDs, we are merely utilizing the RDDs that would > have been produced as part of SparkPlan execution anyway. > (3) SparkPlan fragment substitution. After generating a SparkPlan and before > calling prepare() or execute() on it, we check if any of its nodes have an > associated LogicalPlan that maps to a canonical logical plan matching a cache > entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD > wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of > the current query fragment. If the expected column order differs from what > the current SparkPlan fragment produces, we add a projection to reorder the > columns. We also add safe/unsafe row conversions as necessary to match the > row type that is expected by the parent of the current SparkPlan fragment. > (4) The execute() method of SparkPlan also needs to perform the cache lookup > and substitution described above before producing a new RDD for the current > SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava) > allows to reuse
[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's
[ https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495777#comment-14495777 ] Evan Chan commented on SPARK-6703: -- I should note: Having the jobserver support generic apps that don't implement an interface is an interesting idea (well, more like an implicit Trait { def main(args: Array[String]) }, I suppose). The only way I could think of to have them share a context would be to have the job server load the job jars and start / call the main method. So what you describe might be useful. We also do have users working with multiple contexts in the same JVM. However we are working on support for one JVM per context. Provide a way to discover existing SparkContext's - Key: SPARK-6703 URL: https://issues.apache.org/jira/browse/SPARK-6703 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.3.0 Reporter: Patrick Wendell Assignee: Ilya Ganelin Priority: Critical Right now it is difficult to write a Spark application in a way that can be run independently and also be composed with other Spark applications in an environment such as the JobServer, notebook servers, etc where there is a shared SparkContext. It would be nice to provide a rendez-vous point so that applications can learn whether an existing SparkContext already exists before creating one. The most simple/surgical way I see to do this is to have an optional static SparkContext singleton that people can be retrieved as follows: {code} val sc = SparkContext.getOrCreate(conf = new SparkConf()) {code} And you could also have a setter where some outer framework/server can set it for use by multiple downstream applications. A more advanced version of this would have some named registry or something, but since we only support a single SparkContext in one JVM at this point anyways, this seems sufficient and much simpler. Another advanced option would be to allow plugging in some other notion of configuration you'd pass when retrieving an existing context. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6703) Provide a way to discover existing SparkContext's
[ https://issues.apache.org/jira/browse/SPARK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493303#comment-14493303 ] Evan Chan commented on SPARK-6703: -- Hey folks, Thought I would just put in my 2 cents as the author of the Spark Jobserver. What is the envisioned way for multiple applications to share the same SparkContext? Code has to be running in the same JVM, and for most applications there already must exist some shared knowledge of the framework or environment. This will affect whether this feature is useful or not. For example, the Spark Jobserver requires jobs to implement an interface, and also manages creation of the SparkContext. That way, jobs get the SparkContext through a method call, and we can have other method calls to do things like input validation. What I'm saying is that this feature would have little existing value to job server users, as jobs in job server already have a way to discover the existing context, and to implement a good RESTful API, for example. Another thing to think about is what about SQLContext, HiveContext. I realize there is the JDBC server, but in job server we have a way to pass in alternative forms of the contexts. I suppose you could then add this method to a static SQLContext singleton as well. Provide a way to discover existing SparkContext's - Key: SPARK-6703 URL: https://issues.apache.org/jira/browse/SPARK-6703 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 1.3.0 Reporter: Patrick Wendell Assignee: Ilya Ganelin Priority: Critical Right now it is difficult to write a Spark application in a way that can be run independently and also be composed with other Spark applications in an environment such as the JobServer, notebook servers, etc where there is a shared SparkContext. It would be nice to provide a rendez-vous point so that applications can learn whether an existing SparkContext already exists before creating one. The most simple/surgical way I see to do this is to have an optional static SparkContext singleton that people can be retrieved as follows: {code} val sc = SparkContext.getOrCreate(conf = new SparkConf()) {code} And you could also have a setter where some outer framework/server can set it for use by multiple downstream applications. A more advanced version of this would have some named registry or something, but since we only support a single SparkContext in one JVM at this point anyways, this seems sufficient and much simpler. Another advanced option would be to allow plugging in some other notion of configuration you'd pass when retrieving an existing context. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-1283) Create spark-contrib repo for 1.0
[ https://issues.apache.org/jira/browse/SPARK-1283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evan Chan resolved SPARK-1283. -- Resolution: Won't Fix Yeah this is not needed anymore, with spark-packages.org Create spark-contrib repo for 1.0 - Key: SPARK-1283 URL: https://issues.apache.org/jira/browse/SPARK-1283 Project: Spark Issue Type: Task Components: Project Infra Affects Versions: 1.0.0 Reporter: Evan Chan Fix For: 1.0.0 Let's create a spark-contrib repo to host community projects for the Spark ecosystem that don't quite belong in core, but are very important nevertheless. It would be linked to from official Spark documentation and web site, and help provide visibility for community projects. Some questions: - Who should host this repo, and where should it be hosted? - Github would be a strong preference from usability standpoint - There is talk that Apache might have some facility for this - Contents. Should it simply be links? Git submodules? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2593) Add ability to pass an existing Akka ActorSystem into Spark
[ https://issues.apache.org/jira/browse/SPARK-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169545#comment-14169545 ] Evan Chan commented on SPARK-2593: -- Hmmm :( I believe Spark already uses a shaded version of Akka with a different namespace. Unfortunately it still creates some dependency conflicts down the chain, but I don't remember the details. On Mon, Oct 13, 2014 at 4:58 AM, Helena Edelson (JIRA) j...@apache.org -- The fruit of silence is prayer; the fruit of prayer is faith; the fruit of faith is love; the fruit of love is service; the fruit of service is peace. -- Mother Teresa Add ability to pass an existing Akka ActorSystem into Spark --- Key: SPARK-2593 URL: https://issues.apache.org/jira/browse/SPARK-2593 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Helena Edelson As a developer I want to pass an existing ActorSystem into StreamingContext in load-time so that I do not have 2 actor systems running on a node in an Akka application. This would mean having spark's actor system on its own named-dispatchers as well as exposing the new private creation of its own actor system. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3298) [SQL] registerAsTable / registerTempTable overwrites old tables
[ https://issues.apache.org/jira/browse/SPARK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143716#comment-14143716 ] Evan Chan commented on SPARK-3298: -- Sounds good, thanks! -Evan Never doubt that a small group of thoughtful, committed citizens can change the world - M. Mead [SQL] registerAsTable / registerTempTable overwrites old tables --- Key: SPARK-3298 URL: https://issues.apache.org/jira/browse/SPARK-3298 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.0.2 Reporter: Evan Chan Assignee: Michael Armbrust Priority: Minor Labels: newbie At least in Spark 1.0.2, calling registerAsTable(a) when a had been registered before does not cause an error. However, there is no way to access the old table, even though it may be cached and taking up space. How about at least throwing an error? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2593) Add ability to pass an existing Akka ActorSystem into Spark
[ https://issues.apache.org/jira/browse/SPARK-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14136920#comment-14136920 ] Evan Chan commented on SPARK-2593: -- [~pwendell] I'd have to agree with Helena and Tupshin; I'd like to see the usage of Akka increase; it is underutilized. Also note that it's not true that Akka is only used for RPC; there are Actors used in several places. Performance and resilience would likely improve significantly in several areas with more usage of Akka; the current threads spun up by each driver for various things like a file server are quite wasteful. On Tue, Sep 16, 2014 at 5:17 PM, Tupshin Harper (JIRA) j...@apache.org -- The fruit of silence is prayer; the fruit of prayer is faith; the fruit of faith is love; the fruit of love is service; the fruit of service is peace. -- Mother Teresa Add ability to pass an existing Akka ActorSystem into Spark --- Key: SPARK-2593 URL: https://issues.apache.org/jira/browse/SPARK-2593 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Helena Edelson As a developer I want to pass an existing ActorSystem into StreamingContext in load-time so that I do not have 2 actor systems running on a node in an Akka application. This would mean having spark's actor system on its own named-dispatchers as well as exposing the new private creation of its own actor system. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3298) [SQL] registerAsTable / registerTempTable overwrites old tables
[ https://issues.apache.org/jira/browse/SPARK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14119373#comment-14119373 ] Evan Chan commented on SPARK-3298: -- I can't really think of a good way to prevent people from overwriting their registered tables though without breaking the API. :/ SQL users would be used to getting an error if you CREATE TABLE'd on the same name. Guess this is not quite the same thing, but I would think most API users would not expect just success.I suppose you can have a config option to make it silently fail if the table already exists, but this is no better than the existing behavior [SQL] registerAsTable / registerTempTable overwrites old tables --- Key: SPARK-3298 URL: https://issues.apache.org/jira/browse/SPARK-3298 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.0.2 Reporter: Evan Chan Priority: Minor Labels: newbie At least in Spark 1.0.2, calling registerAsTable(a) when a had been registered before does not cause an error. However, there is no way to access the old table, even though it may be cached and taking up space. How about at least throwing an error? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3295) [Spark SQL] schemaRdd1 ++ schemaRdd2 does not return another SchemaRdd
Evan Chan created SPARK-3295: Summary: [Spark SQL] schemaRdd1 ++ schemaRdd2 does not return another SchemaRdd Key: SPARK-3295 URL: https://issues.apache.org/jira/browse/SPARK-3295 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.0.2 Reporter: Evan Chan Priority: Minor Right now, schemaRdd1.unionAll(schemaRdd2) returns a SchemaRdd. However, schemaRdd1 ++ schemaRdd2 returns an RDD[Row]. Similarly, schemaRdd1.union(schemaRdd2) returns an RDD[Row]. This is inconsistent. Let's make ++ and union have the same behavior as unionAll. Actually, not sure there needs to be both union and unionAll. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3297) [Spark SQL][UI] SchemaRDD toString with many columns messes up Storage tab display
Evan Chan created SPARK-3297: Summary: [Spark SQL][UI] SchemaRDD toString with many columns messes up Storage tab display Key: SPARK-3297 URL: https://issues.apache.org/jira/browse/SPARK-3297 Project: Spark Issue Type: Bug Components: SQL, Web UI Affects Versions: 1.0.2 Reporter: Evan Chan Priority: Minor When a SchemaRDD with many columns (for example, 57 columns in this example) is cached using sqlContext.cacheTable, the Storage tab of the driver Web UI display gets messed up, because the long string of the SchemaRDD causes the first column to be much much wider than the others, and in fact much wider than the width of the browser. It would be nice to have the first column be restricted to, say, 50% of the width of the browser window, with some minimum. For example this is the SchemaRDD text for my table: RDD Storage Info for ExistingRdd [ActionGeo_ADM1Code#198,ActionGeo_CountryCode#199,ActionGeo_FeatureID#200,ActionGeo_FullName#201,ActionGeo_Lat#202,ActionGeo_Long#203,ActionGeo_Type#204,Actor1Code#205,Actor1CountryCode#206,Actor1EthnicCode#207,Actor1Geo_ADM1Code#208,Actor1Geo_CountryCode#209,Actor1Geo_FeatureID#210,Actor1Geo_FullName#211,Actor1Geo_Lat#212,Actor1Geo_Long#213,Actor1Geo_Type#214,Actor1KnownGroupCode#215,Actor1Name#216,Actor1Religion1Code#217,Actor1Religion2Code#218,Actor1Type1Code#219,Actor1Type2Code#220,Actor1Type3Code#221,Actor2Code#222,Actor2CountryCode#223,Actor2EthnicCode#224,Actor2Geo_ADM1Code#225,Actor2Geo_CountryCode#226,Actor2Geo_FeatureID#227,Actor2Geo_FullName#228,Actor2Geo_Lat#229,Actor2Geo_Long#230,Actor2Geo_Type#231,Actor2KnownGroupCode#232,Actor2Name#233,Actor2Religion1Code#234,Actor2Religion2Code#235,Actor2Type1Code#236,Actor2Type2Code#237,Actor2Type3Code#238,AvgTone#239,DATEADDED#240,Day#241,EventBaseCode#242,EventCode#243,EventId#244,EventRootCode#245,FractionDate#246,GoldsteinScale#247,IsRootEvent#248,MonthYear#249,NumArticles#250,NumMentions#251,NumSources#252,QuadClass#253,Year#254], MappedRDD[200] I would personally love to fix the toString method to not necessarily print every column, but to cut it off after a while. This would aid the printout in the Spark Shell as well. For example: [ActionGeo_ADM1Code#198,ActionGeo_CountryCode#199,ActionGeo_FeatureID#200,ActionGeo_FullName#201,ActionGeo_Lat#202 and 52 more columns] -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3299) [SQL] Public API in SQLContext to list tables
Evan Chan created SPARK-3299: Summary: [SQL] Public API in SQLContext to list tables Key: SPARK-3299 URL: https://issues.apache.org/jira/browse/SPARK-3299 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.0.2 Reporter: Evan Chan Priority: Minor There is no public API in SQLContext to list the current tables. This would be pretty helpful. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3215) Add remote interface for SparkContext
[ https://issues.apache.org/jira/browse/SPARK-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14112931#comment-14112931 ] Evan Chan commented on SPARK-3215: -- [~vanzin] we should chat. I'm planning to move our Spark Job Server to have independent processes per SparkContext, and it already is Akka based. Something to consider is that you still need somebody to manage all the SparkContexts that you create, to make them HA, etc. So it is useful to have a layer like the job server. Add remote interface for SparkContext - Key: SPARK-3215 URL: https://issues.apache.org/jira/browse/SPARK-3215 Project: Spark Issue Type: New Feature Components: Spark Core Reporter: Marcelo Vanzin Labels: hive Attachments: RemoteSparkContext.pdf A quick description of the issue: as part of running Hive jobs on top of Spark, it's desirable to have a SparkContext that is running in the background and listening for job requests for a particular user session. Running multiple contexts in the same JVM is not a very good solution. Not only SparkContext currently has issues sharing the same JVM among multiple instances, but that turns the JVM running the contexts into a huge bottleneck in the system. So I'm proposing a solution where we have a SparkContext that is running in a separate process, and listening for requests from the client application via some RPC interface (most probably Akka). I'll attach a document shortly with the current proposal. Let's use this bug to discuss the proposal and any other suggestions. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2360) CSV import to SchemaRDDs
[ https://issues.apache.org/jira/browse/SPARK-2360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14106200#comment-14106200 ] Evan Chan commented on SPARK-2360: -- +1 for this feature. I just had to write something for importing tab-delimited CSVs and converting the types of each column. As for API, it really needs to do type conversion into the built-in types; otherwise it really affects the caching compression efficiency and query speed, as well as what functions can be run on it. I think this is crucial. Maybe one can pass in a Map[String, ColumnType] or something like that. If a type is not specified for a column, then it is assumed to be String. CSV import to SchemaRDDs Key: SPARK-2360 URL: https://issues.apache.org/jira/browse/SPARK-2360 Project: Spark Issue Type: New Feature Components: SQL Reporter: Michael Armbrust Assignee: Hossein Falaki I think the first step it to design the interface that we want to present to users. Mostly this is defining options when importing. Off the top of my head: - What is the separator? - Provide column names or infer them from the first row. - how to handle multiple files with possibly different schemas - do we have a method to let users specify the datatypes of the columns or are they just strings? - what types of quoting / escaping do we want to support? -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2593) Add ability to pass an existing Akka ActorSystem into Spark
[ https://issues.apache.org/jira/browse/SPARK-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14070476#comment-14070476 ] Evan Chan commented on SPARK-2593: -- I would say that the base SparkContext should have this ability, after all it creates multiple actors as well as a base ActorSystem. Sharing a single ActorSystem would also speed up all of Spark's tests, many of which repeatedly create and then tear down ActorSystems (since that's what the base SparkContext does). Add ability to pass an existing Akka ActorSystem into Spark --- Key: SPARK-2593 URL: https://issues.apache.org/jira/browse/SPARK-2593 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Helena Edelson As a developer I want to pass an existing ActorSystem into StreamingContext in load-time so that I do not have 2 actor systems running on a node in an Akka application. This would mean having spark's actor system on its own named-dispatchers as well as exposing the new private creation of its own actor system. I would like to create an Akka Extension that wraps around Spark/Spark Streaming and Cassandra. So the programmatic creation would simply be this for a user val extension = SparkCassandra(system) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-1283) Create spark-contrib repo for 1.0
[ https://issues.apache.org/jira/browse/SPARK-1283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13992974#comment-13992974 ] Evan Chan commented on SPARK-1283: -- ping Any more comments? Objections to creating a landing page for contrib projects in the Spark docs? Create spark-contrib repo for 1.0 - Key: SPARK-1283 URL: https://issues.apache.org/jira/browse/SPARK-1283 Project: Spark Issue Type: Task Components: Project Infra Affects Versions: 1.0.0 Reporter: Evan Chan Fix For: 1.0.0 Let's create a spark-contrib repo to host community projects for the Spark ecosystem that don't quite belong in core, but are very important nevertheless. It would be linked to from official Spark documentation and web site, and help provide visibility for community projects. Some questions: - Who should host this repo, and where should it be hosted? - Github would be a strong preference from usability standpoint - There is talk that Apache might have some facility for this - Contents. Should it simply be links? Git submodules? -- This message was sent by Atlassian JIRA (v6.2#6252)