[jira] [Comment Edited] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15118468#comment-15118468 ] Wang, Gang edited comment on SPARK-13004 at 1/27/16 5:34 AM: - Yes, That is one of our prototypes for proof of concept. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. was (Author: qichfan): Yes, That is one of our prototypes for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang closed SPARK-13004. -- Resolution: Later Preparing actionable items so close it temporarily as Sean Owen suggested. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15119970#comment-15119970 ] Wang, Gang commented on SPARK-13004: I have closed it according to your advice, Thanks. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-13004) Support Non-Volatile Data and Operations
Wang, Gang created SPARK-13004: -- Summary: Support Non-Volatile Data and Operations Key: SPARK-13004 URL: https://issues.apache.org/jira/browse/SPARK-13004 Project: Spark Issue Type: Epic Components: Input/Output, Spark Core Affects Versions: 1.6.0, 1.5.0 Reporter: Wang, Gang Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! This epic tries to further improve the Spark performance with our non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-13004: --- Description: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! This epic tries to further improve the Spark performance with our non-volatile solutions. was: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! This epic tries to further improve the Spark performance with our non-volatile solutions. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative
[jira] [Updated] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-13004: --- Description: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300, align=right! This epic tries to further improve the Spark performance with our non-volatile solutions. was: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! This epic tries to further improve the Spark performance with our non-volatile solutions. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance >
[jira] [Updated] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-13004: --- Description: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! This epic tries to further improve the Spark performance with our non-volatile solutions. was: Based on our experiments, the SerDe-like operations have some significant negative performance impacts on majority of industrial Spark workloads, especially, when the volumn of datasets are much larger than the system memory volumns of Spark cluster available to caching, checkpoint, shuffling/dispatching, data loading and Storing. the JVM on-heap management would downgrade the performance as well when under pressure incurred by large memory demand and frequently memory allocation/free operations. With the trend of adopting advanced server platform technologies e.g. Large Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This project focuses on adopting new features provided by server platform for Spark applications and retrofitting the utilization of hybrid addressable memory resources onto Spark whenever possible. *Data Object Managment* * Using our non-volatile generic object programming model (NVGOP) to avoid SerDe as well as reduce GC overhead. * Minimizing memory footprint to load data lazily. * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. * Using non-volatile/off-heap RDDs to transform Spark datasets. * Avoiding the memory caching part by the way of in-place non-volatile RDD operations. * Avoiding the checkpoints for Spark computing. *Data Memory Management* * Managing hereogeneous memory devices as an unified hybrid memory cache pool for Spark. * Using non-volatile memory-like devices for Spark checkpoint and shuffle. * Supporting to Reclaim allocated memory blocks automatically. * Providing an unified memory block APIs for the general purpose of memory usage. *Computing device management* * AVX instructions, programmable FPGA and GPU. Our customized Spark prototype has shown some potential improvements. [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300, align=right! This epic tries to further improve the Spark performance with our non-volatile solutions. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance >
[jira] [Commented] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15118288#comment-15118288 ] Wang, Gang commented on SPARK-13004: Yes, That is one of prototype for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15118288#comment-15118288 ] Wang, Gang edited comment on SPARK-13004 at 1/27/16 12:44 AM: -- Yes, That is one of our prototypes for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. was (Author: qichfan): Yes, That is one of prototype for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15118468#comment-15118468 ] Wang, Gang commented on SPARK-13004: Yes, That is one of our prototypes for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks. > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-13004) Support Non-Volatile Data and Operations
[ https://issues.apache.org/jira/browse/SPARK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-13004: --- Comment: was deleted (was: Yes, That is one of our prototypes for concept of proof. We are preparing to propose some specific changes for Spark. e.g. Non-volatile checkpoint. Non-volatile Caching and Storage, those would be all working around with non-volatile RDD. Thanks.) > Support Non-Volatile Data and Operations > > > Key: SPARK-13004 > URL: https://issues.apache.org/jira/browse/SPARK-13004 > Project: Spark > Issue Type: Epic > Components: Input/Output, Spark Core >Affects Versions: 1.5.0, 1.6.0 >Reporter: Wang, Gang > Labels: Non-VolatileRDD, Non-volatileComputing, RDD, performance > > Based on our experiments, the SerDe-like operations have some significant > negative performance impacts on majority of industrial Spark workloads, > especially, when the volumn of datasets are much larger than the system > memory volumns of Spark cluster available to caching, checkpoint, > shuffling/dispatching, data loading and Storing. the JVM on-heap management > would downgrade the performance as well when under pressure incurred by large > memory demand and frequently memory allocation/free operations. > With the trend of adopting advanced server platform technologies e.g. Large > Memory Server, Non-volatile Memory and NVMe/Fast SSD Array Storage, This > project focuses on adopting new features provided by server platform for > Spark applications and retrofitting the utilization of hybrid addressable > memory resources onto Spark whenever possible. > *Data Object Managment* > * Using our non-volatile generic object programming model (NVGOP) to avoid > SerDe as well as reduce GC overhead. > * Minimizing memory footprint to load data lazily. > * Being naturally fit for RDD schemas in non-volatile RDD and off-heap RDD. > * Using non-volatile/off-heap RDDs to transform Spark datasets. > * Avoiding the memory caching part by the way of in-place non-volatile RDD > operations. > * Avoiding the checkpoints for Spark computing. > *Data Memory Management* > > * Managing hereogeneous memory devices as an unified hybrid memory cache > pool for Spark. > * Using non-volatile memory-like devices for Spark checkpoint and shuffle. > * Supporting to Reclaim allocated memory blocks automatically. > * Providing an unified memory block APIs for the general purpose of memory > usage. > > *Computing device management* > * AVX instructions, programmable FPGA and GPU. > > Our customized Spark prototype has shown some potential improvements. > [https://github.com/NonVolatileComputing/spark/tree/NonVolatileRDD] > !http://bigdata-memory.github.io/images/Spark_mlib_kmeans.png|width=300! > !http://bigdata-memory.github.io/images/total_GC_STW_pausetime.png|width=300! > > This epic tries to further improve the Spark performance with our > non-volatile solutions. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-23373: --- Issue Type: Bug (was: New Feature) > Can not execute "count distinct" queries on parquet formatted table > --- > > Key: SPARK-23373 > URL: https://issues.apache.org/jira/browse/SPARK-23373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wang, Gang >Priority: Major > > I failed to run sql "select count(distinct n_name) from nation", table nation > is formatted in Parquet, error trace is as following. > _spark-sql> select count(distinct n_name) from nation;_ > _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_ > _Error in query: Table or view not found: nation; line 1 pos 35_ > _spark-sql> select count(distinct n_name) from nation_parquet;_ > _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_parquet_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: > array_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: > struct_ > _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is > true_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as > values in memory (estimated size 305.0 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored > as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added > broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: > 366.3 MB)_ > _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from > processCmd at CliDriver.java:376_ > _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after > partition pruning:_ > _PartitionDirectory([empty > row],ArrayBuffer(LocatedFileStatus\{path=hdfs://btd-dev-2425209.lvs01.dev.ebayc3.com:8020/apps/hive/warehouse/nation_parquet/00_0; > isDirectory=false; length=3216; replication=3; blocksize=134217728; > modification_time=1516619879024; access_time=0; owner=; group=; > permission=rw-rw-rw-; isSymlink=false}))_ > _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin > packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 > bytes._ > _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select > count(distinct n_name) from nation_parquet]_ > {color:#FF}*_org.apache.spark.SparkException: Task not > serializable_*{color} > _at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ > _at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ > _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ > _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ > _at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ > _at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ > _at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ > _at >
[jira] [Updated] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-23373: --- Description: I failed to run sql "select count(distinct n_name) from nation", table nation is formatted in Parquet, error trace is as following. _spark-sql> select count(distinct n_name) from nation;_ _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_ _Error in query: Table or view not found: nation; line 1 pos 35_ _spark-sql> select count(distinct n_name) from nation_parquet;_ _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_parquet_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: array_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: struct_ _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is true_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as values in memory (estimated size 305.0 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: 366.3 MB)_ _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from processCmd at CliDriver.java:376_ _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after partition pruning:_ _PartitionDirectory([empty row],ArrayBuffer(LocatedFileStatus\{path=hdfs://**.com:8020/apps/hive/warehouse/nation_parquet/00_0; isDirectory=false; length=3216; replication=3; blocksize=134217728; modification_time=1516619879024; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}))_ _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes._ _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select count(distinct n_name) from nation_parquet]_ {color:#ff}*_org.apache.spark.SparkException: Task not serializable_*{color} _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ _at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)_ _at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)_ _at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at
[jira] [Updated] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-23373: --- Description: I failed to run sql "select count(distinct n_name) from nation", table nation is formatted in Parquet, error trace is as following. _spark-sql> select count(distinct n_name) from nation;_ _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_ _Error in query: Table or view not found: nation; line 1 pos 35_ _spark-sql> select count(distinct n_name) from nation_parquet;_ _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_parquet_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: array_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: struct_ _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is true_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as values in memory (estimated size 305.0 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: 366.3 MB)_ _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from processCmd at CliDriver.java:376_ _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after partition pruning:_ _PartitionDirectory([empty row],ArrayBuffer(LocatedFileStatus\{path=hdfs://**.com:8020/apps/hive/warehouse/nation_parquet/00_0; isDirectory=false; length=3216; replication=3; blocksize=134217728; modification_time=1516619879024; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}))_ _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes._ _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select count(distinct n_name) from nation_parquet]_ {color:#ff}*_org.apache.spark.SparkException: Task not serializable_*{color} _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ _at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)_ _at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)_ _at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)_ _at
[jira] [Created] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
Wang, Gang created SPARK-23373: -- Summary: Can not execute "count distinct" queries on parquet formatted table Key: SPARK-23373 URL: https://issues.apache.org/jira/browse/SPARK-23373 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.2.0 Reporter: Wang, Gang I failed to run sql "select count(distinct n_name) from nation", table nation is formatted in Parquet, error trace is as following. _spark-sql> select count(distinct n_name) from nation;_ _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_ _Error in query: Table or view not found: nation; line 1 pos 35_ _spark-sql> select count(distinct n_name) from nation_parquet;_ _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select count(distinct n_name) from nation_parquet_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: array_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: struct_ _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ _18/02/09 03:55:39 INFO main HashAggregateExec:54 spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current version of codegened fast hashmap does not support this aggregate._ _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is true_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as values in memory (estimated size 305.0 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: 366.3 MB)_ _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from processCmd at CliDriver.java:376_ _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after partition pruning:_ _PartitionDirectory([empty row],ArrayBuffer(LocatedFileStatus\{path=hdfs://btd-dev-2425209.lvs01.dev.ebayc3.com:8020/apps/hive/warehouse/nation_parquet/00_0; isDirectory=false; length=3216; replication=3; blocksize=134217728; modification_time=1516619879024; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}))_ _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes._ _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select count(distinct n_name) from nation_parquet]_ {color:#FF}*_org.apache.spark.SparkException: Task not serializable_*{color} _at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ _at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ _at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)_ _at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)_ _at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)_ _at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)_ _at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)_ _at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)_ _at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)_ _at
[jira] [Comment Edited] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358355#comment-16358355 ] Wang, Gang edited comment on SPARK-23373 at 2/9/18 1:01 PM: Yes. Seems related to my test environment. While, I tried in a Spark suite, in class _*PruneFileSourcePartitionsSuite*, method_ test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule"). Add _sql("select count(distinct id) from tbl").collect()_ Got the same exception. Could you please have a try in your side? was (Author: gwang3): Yes. Seems related to my test environment. While, I tried in a Spark suite, in class _*PruneFileSourcePartitionsSuite*, method_ test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule"). Add _sql("select count(distinct id) from tbl").collect()_ __ got the same exception. Could you please have a try in your side? > Can not execute "count distinct" queries on parquet formatted table > --- > > Key: SPARK-23373 > URL: https://issues.apache.org/jira/browse/SPARK-23373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wang, Gang >Priority: Major > > I failed to run sql "select count(distinct n_name) from nation", table nation > is formatted in Parquet, error trace is as following. > _spark-sql> select count(distinct n_name) from nation;_ > _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_ > _Error in query: Table or view not found: nation; line 1 pos 35_ > _spark-sql> select count(distinct n_name) from nation_parquet;_ > _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_parquet_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: > array_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: > struct_ > _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is > true_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as > values in memory (estimated size 305.0 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored > as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added > broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: > 366.3 MB)_ > _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from > processCmd at CliDriver.java:376_ > _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after > partition pruning:_ > _PartitionDirectory([empty > row],ArrayBuffer(LocatedFileStatus\{path=hdfs://**.com:8020/apps/hive/warehouse/nation_parquet/00_0; > isDirectory=false; length=3216; replication=3; blocksize=134217728; > modification_time=1516619879024; access_time=0; owner=; group=; > permission=rw-rw-rw-; isSymlink=false}))_ > _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin > packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 > bytes._ > _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select > count(distinct n_name) from nation_parquet]_ >
[jira] [Commented] (SPARK-23373) Can not execute "count distinct" queries on parquet formatted table
[ https://issues.apache.org/jira/browse/SPARK-23373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358355#comment-16358355 ] Wang, Gang commented on SPARK-23373: Yes. Seems related to my test environment. While, I tried in a Spark suite, in class _*PruneFileSourcePartitionsSuite*, method_ test("SPARK-20986 Reset table's statistics after PruneFileSourcePartitions rule"). Add _sql("select count(distinct id) from tbl").collect()_ __ got the same exception. Could you please have a try in your side? > Can not execute "count distinct" queries on parquet formatted table > --- > > Key: SPARK-23373 > URL: https://issues.apache.org/jira/browse/SPARK-23373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wang, Gang >Priority: Major > > I failed to run sql "select count(distinct n_name) from nation", table nation > is formatted in Parquet, error trace is as following. > _spark-sql> select count(distinct n_name) from nation;_ > _18/02/09 03:55:28 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_ > _Error in query: Table or view not found: nation; line 1 pos 35_ > _spark-sql> select count(distinct n_name) from nation_parquet;_ > _18/02/09 03:55:36 INFO main SparkSqlParser:54 Parsing command: select > count(distinct n_name) from nation_parquet_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: int_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: string_ > _18/02/09 03:55:36 INFO main CatalystSqlParser:54 Parsing command: > array_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Pruning directories with:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Data Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Post-Scan Filters:_ > _18/02/09 03:55:38 INFO main FileSourceStrategy:54 Output Data Schema: > struct_ > _18/02/09 03:55:38 INFO main FileSourceScanExec:54 Pushed Filters:_ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 295.88685 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 51.075394 ms_ > _18/02/09 03:55:39 INFO main HashAggregateExec:54 > spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but current > version of codegened fast hashmap does not support this aggregate._ > _18/02/09 03:55:39 INFO main CodeGenerator:54 Code generated in 42.819226 ms_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 parquetFilterPushDown is > true_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 start filter class_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 Pushed not defined_ > _18/02/09 03:55:39 INFO main ParquetFileFormat:54 end filter class_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0 stored as > values in memory (estimated size 305.0 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO main MemoryStore:54 Block broadcast_0_piece0 stored > as bytes in memory (estimated size 27.6 KB, free 366.0 MB)_ > _18/02/09 03:55:39 INFO dispatcher-event-loop-7 BlockManagerInfo:54 Added > broadcast_0_piece0 in memory on 10.64.205.170:45616 (size: 27.6 KB, free: > 366.3 MB)_ > _18/02/09 03:55:39 INFO main SparkContext:54 Created broadcast 0 from > processCmd at CliDriver.java:376_ > _18/02/09 03:55:39 INFO main InMemoryFileIndex:54 Selected files after > partition pruning:_ > _PartitionDirectory([empty > row],ArrayBuffer(LocatedFileStatus\{path=hdfs://**.com:8020/apps/hive/warehouse/nation_parquet/00_0; > isDirectory=false; length=3216; replication=3; blocksize=134217728; > modification_time=1516619879024; access_time=0; owner=; group=; > permission=rw-rw-rw-; isSymlink=false}))_ > _18/02/09 03:55:39 INFO main FileSourceScanExec:54 Planning scan with bin > packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 > bytes._ > _18/02/09 03:55:39 ERROR main SparkSQLDriver:91 Failed in [select > count(distinct n_name) from nation_parquet]_ > {color:#ff}*_org.apache.spark.SparkException: Task not > serializable_*{color} > _at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)_ > _at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)_ > _at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)_ > _at
[jira] [Created] (SPARK-25401) Reorder the required ordering to match the table's output ordering for bucket join
Wang, Gang created SPARK-25401: -- Summary: Reorder the required ordering to match the table's output ordering for bucket join Key: SPARK-25401 URL: https://issues.apache.org/jira/browse/SPARK-25401 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Wang, Gang Currently, we check if SortExec is needed between a operator and its child operator in method orderingSatisfies, and method orderingSatisfies require the order in the SortOrders are all the same. While, take the following case into consideration. * Table a is bucketed by (a1, a2), sorted by (a2, a1), and buckets number is 200. * Table b is bucketed by (b1, b2), sorted by (b2, b1), and buckets number is 200. * Table a join table b on (a1=b1, a2=b2) In this case, if the join is sort merge join, the query planner won't add exchange on both sides, while, sort will be added on both sides. Actually, sort is also unnecessary, since in the same bucket, like bucket 1 of table a, and bucket 1 of table b, (a1=b1, a2=b2) is equivalent to (a2=b2, a1=b1). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25411) Implement range partition in Spark
Wang, Gang created SPARK-25411: -- Summary: Implement range partition in Spark Key: SPARK-25411 URL: https://issues.apache.org/jira/browse/SPARK-25411 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.3.0 Reporter: Wang, Gang In our PROD environment, there are some partitioned fact tables, which are all quite huge. To accelerate join execution, we need make them also bucketed. Than comes the problem, if the bucket number is large enough, there may be two many files(files count = bucket number * partition count), which may bring pressure to the HDFS. And if the bucket number is small, Spark will launch equal number of tasks to read/write it. So, can we implement a new partition support range values, just like range partition in Oracle/MySQL ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). Say, we can partition by a date column, and make every two months as a partition, or partitioned by a integer column, make interval of 1 as a partition. Ideally, feature like range partition should be implemented in Hive. While, it's been always hard to update Hive version in a prod environment, and much lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23839) consider bucket join in cost-based JoinReorder rule
[ https://issues.apache.org/jira/browse/SPARK-23839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421906#comment-16421906 ] Wang, Gang commented on SPARK-23839: Good point! Currently, Spark just take data size into consideration on join type selection. While, the existing table traits are ignored, like, bucketing and sorting. Say, we have a query satisfying bucket join, Spark may chose shuffle hash join other than sort merge join, since it will check if the join match shuffle hash join first in rule JoinSelection, and all the estimate based on data size or something, bucketing and sorting are not inferred. > consider bucket join in cost-based JoinReorder rule > --- > > Key: SPARK-23839 > URL: https://issues.apache.org/jira/browse/SPARK-23839 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiaoju Wu >Priority: Minor > > Since spark 2.2, the cost-based JoinReorder rule is implemented and in Spark > 2.3 released, it is improved with histogram. While it doesn't take the cost > of the different join implementations. For example: > TableA JOIN TableB JOIN TableC > TableA will output 10,000 rows after filter and projection. > TableB will output 10,000 rows after filter and projection. > TableC will output 8,000 rows after filter and projection. > The current JoinReorder rule will possibly optimize the plan to join TableC > with TableA firstly and then TableB. But if the TableA and TableB are bucket > tables and can be applied with BucketJoin, it could be a different story. > > Also, to support bucket join of more than 2 tables when table bucket number > is multiple of another (SPARK-17570), whether bucket join can take effect > depends on the result of JoinReorder. For example of "a join b join c" which > has bucket number like 8, 12, 4, JoinReorder rule should optimize the order > to "c join a join b“ to make the bucket join take effect. > > Based on current CBO JoinReorder, there are possibly 2 part to be changed: > # CostBasedJoinReorder rule is applied in optimizer phase while we do Join > selection in planner phase and bucket join optimization in EnsureRequirements > which is in preparation phase. Both are after optimizer. > # Current statistics and join cost formula are based data selectivity and > cardinality, we need to add statistics for present the join method cost like > shuffle, sort, hash and etc. Also we need to add the statistics into the > formula to estimate the join cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661610#comment-16661610 ] Wang, Gang commented on SPARK-25411: [~cloud_fan] How do you think of this feature? In our inner benchmark, it do improve a lot in performance for huge tables join with predicates. > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17570) Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple for tables
[ https://issues.apache.org/jira/browse/SPARK-17570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642730#comment-16642730 ] Wang, Gang commented on SPARK-17570: Any update? > Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple > for tables > - > > Key: SPARK-17570 > URL: https://issues.apache.org/jira/browse/SPARK-17570 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.0 >Reporter: Tejas Patil >Priority: Minor > > In case of bucketed tables, Spark will avoid doing `Sort` and `Exchange` if > the input tables and output table has same number of buckets. However, > unequal bucketing will always lead to `Sort` and `Exchange`. If the number of > buckets in the output table is a factor of the buckets in the input table, we > should be able to avoid `Sort` and `Exchange` and directly join those. > eg. > Assume Input1, Input2 and Output be bucketed + sorted tables over the same > columns but with different number of buckets. Input1 has 8 buckets, Input1 > has 4 buckets and Output has 4 buckets. Since hash-partitioning is done using > Modulus, if we JOIN buckets (0, 4) of Input1 and buckets (0, 4, 8) of Input2 > in the same task, it would give the bucket 0 of output table. > {noformat} > Input1 (0, 4) (1, 3) (2, 5) (3, 7) > Input2 (0, 4, 8) (1, 3, 9) (2, 5, 10) (3, 7, 11) > Output (0) (1) (2) (3) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-25411: --- Attachment: range partition design doc.pdf > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627145#comment-16627145 ] Wang, Gang commented on SPARK-25411: Add a design doc, please help to review. > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-25411: --- Attachment: range partition design doc.pdf > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf, range partition design > doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-25411: --- Attachment: (was: range partition design doc.pdf) > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26672) SinglePartition may not satisfies HashClusteredDistribution/OrderedDistribution
[ https://issues.apache.org/jira/browse/SPARK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-26672: --- Summary: SinglePartition may not satisfies HashClusteredDistribution/OrderedDistribution (was: SinglePartition should not satisfies HashClusteredDistribution/OrderedDistribution) > SinglePartition may not satisfies > HashClusteredDistribution/OrderedDistribution > --- > > Key: SPARK-26672 > URL: https://issues.apache.org/jira/browse/SPARK-26672 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang, Gang >Priority: Major > > If we are loading data to a *bucketed table* TEST_TABLE from another table > SRC_TABLE(bucketed or not) with sql: > insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. > Data inserted into TEST_TABLE will not be bucketed since after LimitExec the > output partitioning will be SinglePartition, and in current logic, it > satisfies the HashClusteredDistribution, so no shuffle will be added. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26672) SinglePartition may not satisfies HashClusteredDistribution/OrderedDistribution
[ https://issues.apache.org/jira/browse/SPARK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-26672: --- Description: If we are loading data to a *bucketed table* TEST_TABLE of which bucket number is not 1, from another table SRC_TABLE(bucketed or not) with sql: insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. Data inserted into TEST_TABLE will not be bucketed since after LimitExec the output partitioning will be SinglePartition, and in current logic, it satisfies the HashClusteredDistribution, so no shuffle will be added. was: If we are loading data to a *bucketed table* TEST_TABLE from another table SRC_TABLE(bucketed or not) with sql: insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. Data inserted into TEST_TABLE will not be bucketed since after LimitExec the output partitioning will be SinglePartition, and in current logic, it satisfies the HashClusteredDistribution, so no shuffle will be added. > SinglePartition may not satisfies > HashClusteredDistribution/OrderedDistribution > --- > > Key: SPARK-26672 > URL: https://issues.apache.org/jira/browse/SPARK-26672 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang, Gang >Priority: Major > > If we are loading data to a *bucketed table* TEST_TABLE of which bucket > number is not 1, from another table SRC_TABLE(bucketed or not) with sql: > insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. > Data inserted into TEST_TABLE will not be bucketed since after LimitExec the > output partitioning will be SinglePartition, and in current logic, it > satisfies the HashClusteredDistribution, so no shuffle will be added. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26672) SinglePartition should not satisfies HashClusteredDistribution/OrderedDistribution
Wang, Gang created SPARK-26672: -- Summary: SinglePartition should not satisfies HashClusteredDistribution/OrderedDistribution Key: SPARK-26672 URL: https://issues.apache.org/jira/browse/SPARK-26672 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Wang, Gang If we are loading data to a *bucketed table* TEST_TABLE from another table SRC_TABLE(bucketed or not) with sql: insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. Data inserted into TEST_TABLE will not be bucketed since after LimitExec the output partitioning will be SinglePartition, and in current logic, it satisfies the HashClusteredDistribution, so no shuffle will be added. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-26672) SinglePartition may not satisfies HashClusteredDistribution/OrderedDistribution
[ https://issues.apache.org/jira/browse/SPARK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang closed SPARK-26672. -- > SinglePartition may not satisfies > HashClusteredDistribution/OrderedDistribution > --- > > Key: SPARK-26672 > URL: https://issues.apache.org/jira/browse/SPARK-26672 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang, Gang >Priority: Major > > If we are loading data to a *bucketed table* TEST_TABLE of which bucket > number is not 1, from another table SRC_TABLE(bucketed or not) with sql: > insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. > Data inserted into TEST_TABLE will not be bucketed since after LimitExec the > output partitioning will be SinglePartition, and in current logic, it > satisfies the HashClusteredDistribution, so no shuffle will be added. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26672) SinglePartition may not satisfies HashClusteredDistribution/OrderedDistribution
[ https://issues.apache.org/jira/browse/SPARK-26672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang resolved SPARK-26672. Resolution: Not A Problem This is only a bug in our inner Spark version. It's ok in community version. > SinglePartition may not satisfies > HashClusteredDistribution/OrderedDistribution > --- > > Key: SPARK-26672 > URL: https://issues.apache.org/jira/browse/SPARK-26672 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang, Gang >Priority: Major > > If we are loading data to a *bucketed table* TEST_TABLE of which bucket > number is not 1, from another table SRC_TABLE(bucketed or not) with sql: > insert overwrite table TEST_TABLE select * from SRC_TABLE limit 1000. > Data inserted into TEST_TABLE will not be bucketed since after LimitExec the > output partitioning will be SinglePartition, and in current logic, it > satisfies the HashClusteredDistribution, so no shuffle will be added. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on data size
Wang, Gang created SPARK-26375: -- Summary: Rule PruneFileSourcePartitions should be fired before any other rules based on data size Key: SPARK-26375 URL: https://issues.apache.org/jira/browse/SPARK-26375 Project: Spark Issue Type: Improvement Components: Optimizer Affects Versions: 2.3.0 Reporter: Wang, Gang In catalyst, some optimize rules are base on table statistics, like rule ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In these rules, statistics accuracy are crucial. While, currently all these rules are fired before partition pruning, which may get inaccurate statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-26375: --- Summary: Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics (was: Rule PruneFileSourcePartitions should be fired before any other rules based on data size) > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: Optimizer >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may get inaccurate statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-26375: --- Description: In catalyst, some optimize rules are base on table statistics, like rule ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In these rules, statistics accuracy are crucial. While, currently all these rules are fired before partition pruning, which may result in inaccurate statistics. (was: In catalyst, some optimize rules are base on table statistics, like rule ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In these rules, statistics accuracy are crucial. While, currently all these rules are fired before partition pruning, which may get inaccurate statistics.) > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: Optimizer >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25401) Reorder the required ordering to match the table's output ordering for bucket join
[ https://issues.apache.org/jira/browse/SPARK-25401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712873#comment-16712873 ] Wang, Gang commented on SPARK-25401: Yeah. I think so. And please make sure the outputOrdering of SortMergeJoin is align with the reordered keys. > Reorder the required ordering to match the table's output ordering for bucket > join > -- > > Key: SPARK-25401 > URL: https://issues.apache.org/jira/browse/SPARK-25401 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > Currently, we check if SortExec is needed between a operator and its child > operator in method orderingSatisfies, and method orderingSatisfies require > the order in the SortOrders are all the same. > While, take the following case into consideration. > * Table a is bucketed by (a1, a2), sorted by (a2, a1), and buckets number is > 200. > * Table b is bucketed by (b1, b2), sorted by (b2, b1), and buckets number is > 200. > * Table a join table b on (a1=b1, a2=b2) > In this case, if the join is sort merge join, the query planner won't add > exchange on both sides, while, sort will be added on both sides. Actually, > sort is also unnecessary, since in the same bucket, like bucket 1 of table a, > and bucket 1 of table b, (a1=b1, a2=b2) is equivalent to (a2=b2, a1=b1). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-25411: --- Description: In our PROD environment, there are some partitioned fact tables, which are all quite huge. To accelerate join execution, we need make them also bucketed. Than comes the problem, if the bucket number is large enough, there may be too many files(files count = bucket number * partition count), which may bring pressure to the HDFS. And if the bucket number is small, Spark will launch equal number of tasks to read/write it. So, can we implement a new partition support range values, just like range partition in Oracle/MySQL ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). Say, we can partition by a date column, and make every two months as a partition, or partitioned by a integer column, make interval of 1 as a partition. Ideally, feature like range partition should be implemented in Hive. While, it's been always hard to update Hive version in a prod environment, and much lightweight and flexible if we implement it in Spark. was: In our PROD environment, there are some partitioned fact tables, which are all quite huge. To accelerate join execution, we need make them also bucketed. Than comes the problem, if the bucket number is large enough, there may be two many files(files count = bucket number * partition count), which may bring pressure to the HDFS. And if the bucket number is small, Spark will launch equal number of tasks to read/write it. So, can we implement a new partition support range values, just like range partition in Oracle/MySQL ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). Say, we can partition by a date column, and make every two months as a partition, or partitioned by a integer column, make interval of 1 as a partition. Ideally, feature like range partition should be implemented in Hive. While, it's been always hard to update Hive version in a prod environment, and much lightweight and flexible if we implement it in Spark. > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In our PROD environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-25411: --- Description: In our product environment, there are some partitioned fact tables, which are all quite huge. To accelerate join execution, we need make them also bucketed. Than comes the problem, if the bucket number is large enough, there may be too many files(files count = bucket number * partition count), which may bring pressure to the HDFS. And if the bucket number is small, Spark will launch equal number of tasks to read/write it. So, can we implement a new partition support range values, just like range partition in Oracle/MySQL ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). Say, we can partition by a date column, and make every two months as a partition, or partitioned by a integer column, make interval of 1 as a partition. Ideally, feature like range partition should be implemented in Hive. While, it's been always hard to update Hive version in a prod environment, and much lightweight and flexible if we implement it in Spark. was: In our PROD environment, there are some partitioned fact tables, which are all quite huge. To accelerate join execution, we need make them also bucketed. Than comes the problem, if the bucket number is large enough, there may be too many files(files count = bucket number * partition count), which may bring pressure to the HDFS. And if the bucket number is small, Spark will launch equal number of tasks to read/write it. So, can we implement a new partition support range values, just like range partition in Oracle/MySQL ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). Say, we can partition by a date column, and make every two months as a partition, or partitioned by a integer column, make interval of 1 as a partition. Ideally, feature like range partition should be implemented in Hive. While, it's been always hard to update Hive version in a prod environment, and much lightweight and flexible if we implement it in Spark. > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang resolved SPARK-26375. Resolution: Not A Problem > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26375) Rule PruneFileSourcePartitions should be fired before any other rules based on table statistics
[ https://issues.apache.org/jira/browse/SPARK-26375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724827#comment-16724827 ] Wang, Gang commented on SPARK-26375: Should be okay, filter on partition columns is also regarded as a normal filter, and the output stats is measured in class FilterEstimation. > Rule PruneFileSourcePartitions should be fired before any other rules based > on table statistics > --- > > Key: SPARK-26375 > URL: https://issues.apache.org/jira/browse/SPARK-26375 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > > In catalyst, some optimize rules are base on table statistics, like rule > ReorderJoin, in which star schema is detected, and CostBasedJoinReorder. In > these rules, statistics accuracy are crucial. While, currently all these > rules are fired before partition pruning, which may result in inaccurate > statistics. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25411) Implement range partition in Spark
[ https://issues.apache.org/jira/browse/SPARK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885028#comment-16885028 ] Wang, Gang commented on SPARK-25411: I referred to ORACLE DDLs, quite close to PG. > Implement range partition in Spark > -- > > Key: SPARK-25411 > URL: https://issues.apache.org/jira/browse/SPARK-25411 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wang, Gang >Priority: Major > Attachments: range partition design doc.pdf > > > In our product environment, there are some partitioned fact tables, which are > all quite huge. To accelerate join execution, we need make them also > bucketed. Than comes the problem, if the bucket number is large enough, there > may be too many files(files count = bucket number * partition count), which > may bring pressure to the HDFS. And if the bucket number is small, Spark will > launch equal number of tasks to read/write it. > > So, can we implement a new partition support range values, just like range > partition in Oracle/MySQL > ([https://docs.oracle.com/cd/E17952_01/mysql-5.7-en/partitioning-range.html]). > Say, we can partition by a date column, and make every two months as a > partition, or partitioned by a integer column, make interval of 1 as a > partition. > > Ideally, feature like range partition should be implemented in Hive. While, > it's been always hard to update Hive version in a prod environment, and much > lightweight and flexible if we implement it in Spark. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-29189) Add an option to ignore block locations when listing file
Wang, Gang created SPARK-29189: -- Summary: Add an option to ignore block locations when listing file Key: SPARK-29189 URL: https://issues.apache.org/jira/browse/SPARK-29189 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Wang, Gang In our PROD env, we have a pure Spark cluster, I think this is also pretty common, where computation is separated from storage layer. In such deploy mode, data locality is never reachable. And there are some configurations in Spark scheduler to reduce waiting time for data locality(e.g. "spark.locality.wait"). While, problem is that, in listing file phase, the location informations of all the files, with all the blocks inside each file, are all fetched from the distributed file system. Actually, in a PROD environment, a table can be so huge that even fetching all these location informations need take tens of seconds. To improve such scenario, Spark need provide an option, where data locality can be totally ignored, all we need in the listing file phase are the files locations, without any block location informations. And we made a benchmark in our PROD env, after ignore the block locations, we got a pretty huge improvement. ||Table Size||Total File Number||Total Block Number||List File With Block Location Duration||List File Without Block Location Duration|| |22.6T|3|12|16.841s|1.730s| |28.8 T|42001|148964|10.099s|2.858s| |3.4 T|2| 2|5.833s|4.881s| -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29189) Add an option to ignore block locations when listing file
[ https://issues.apache.org/jira/browse/SPARK-29189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-29189: --- Description: In our PROD env, we have a pure Spark cluster, I think this is also pretty common, where computation is separated from storage layer. In such deploy mode, data locality is never reachable. And there are some configurations in Spark scheduler to reduce waiting time for data locality(e.g. "spark.locality.wait"). While, problem is that, in listing file phase, the location informations of all the files, with all the blocks inside each file, are all fetched from the distributed file system. Actually, in a PROD environment, a table can be so huge that even fetching all these location informations need take tens of seconds. To improve such scenario, Spark need provide an option, where data locality can be totally ignored, all we need in the listing file phase are the files locations, without any block location informations. And we made a benchmark in our PROD env, after ignore the block locations, we got a pretty huge improvement. ||Table Size||Total File Number||Total Block Number||List File Duration(With Block Location)||List File Duration(Without Block Location)|| |22.6T|3|12|16.841s|1.730s| |28.8 T|42001|148964|10.099s|2.858s| |3.4 T|2| 2|5.833s|4.881s| was: In our PROD env, we have a pure Spark cluster, I think this is also pretty common, where computation is separated from storage layer. In such deploy mode, data locality is never reachable. And there are some configurations in Spark scheduler to reduce waiting time for data locality(e.g. "spark.locality.wait"). While, problem is that, in listing file phase, the location informations of all the files, with all the blocks inside each file, are all fetched from the distributed file system. Actually, in a PROD environment, a table can be so huge that even fetching all these location informations need take tens of seconds. To improve such scenario, Spark need provide an option, where data locality can be totally ignored, all we need in the listing file phase are the files locations, without any block location informations. And we made a benchmark in our PROD env, after ignore the block locations, we got a pretty huge improvement. ||Table Size||Total File Number||Total Block Number||List File With Block Location Duration||List File Without Block Location Duration|| |22.6T|3|12|16.841s|1.730s| |28.8 T|42001|148964|10.099s|2.858s| |3.4 T|2| 2|5.833s|4.881s| > Add an option to ignore block locations when listing file > - > > Key: SPARK-29189 > URL: https://issues.apache.org/jira/browse/SPARK-29189 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wang, Gang >Priority: Major > > In our PROD env, we have a pure Spark cluster, I think this is also pretty > common, where computation is separated from storage layer. In such deploy > mode, data locality is never reachable. > And there are some configurations in Spark scheduler to reduce waiting time > for data locality(e.g. "spark.locality.wait"). While, problem is that, in > listing file phase, the location informations of all the files, with all the > blocks inside each file, are all fetched from the distributed file system. > Actually, in a PROD environment, a table can be so huge that even fetching > all these location informations need take tens of seconds. > To improve such scenario, Spark need provide an option, where data locality > can be totally ignored, all we need in the listing file phase are the files > locations, without any block location informations. > > And we made a benchmark in our PROD env, after ignore the block locations, we > got a pretty huge improvement. > ||Table Size||Total File Number||Total Block Number||List File Duration(With > Block Location)||List File Duration(Without Block Location)|| > |22.6T|3|12|16.841s|1.730s| > |28.8 T|42001|148964|10.099s|2.858s| > |3.4 T|2| 2|5.833s|4.881s| > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32464) Support skew handling on join with one side that has no query stage
Wang, Gang created SPARK-32464: -- Summary: Support skew handling on join with one side that has no query stage Key: SPARK-32464 URL: https://issues.apache.org/jira/browse/SPARK-32464 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Wang, Gang In our production environment, there are many bucket tables, which are used to join with other tables. And there are some skewed joins now and then. While, in current implementation, the skew join handling can only applied when both sides of a SMJ are QueryStages. So skew join handling is not able to deal with such cases. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32464) Support skew handling on join that has one side with no query stage
[ https://issues.apache.org/jira/browse/SPARK-32464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang, Gang updated SPARK-32464: --- Summary: Support skew handling on join that has one side with no query stage (was: Support skew handling on join with one side that has no query stage) > Support skew handling on join that has one side with no query stage > --- > > Key: SPARK-32464 > URL: https://issues.apache.org/jira/browse/SPARK-32464 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wang, Gang >Priority: Major > > In our production environment, there are many bucket tables, which are used > to join with other tables. And there are some skewed joins now and then. > While, in current implementation, the skew join handling can only applied > when both sides of a SMJ are QueryStages. So skew join handling is not able > to deal with such cases. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32464) Support skew handling on join that has one side with no query stage
[ https://issues.apache.org/jira/browse/SPARK-32464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17166071#comment-17166071 ] Wang, Gang commented on SPARK-32464: A PR [https://github.com/apache/spark/pull/29266] > Support skew handling on join that has one side with no query stage > --- > > Key: SPARK-32464 > URL: https://issues.apache.org/jira/browse/SPARK-32464 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wang, Gang >Priority: Major > > In our production environment, there are many bucket tables, which are used > to join with other tables. And there are some skewed joins now and then. > While, in current implementation, the skew join handling can only applied > when both sides of a SMJ are QueryStages. So skew join handling is not able > to deal with such cases. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org