[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468932#comment-15468932 ] Ryan Blue commented on SPARK-17396: --- I opened a PR with a fix. It still uses a ForkJoinPool because the thread pool task support is marked as deprecated in Scala. So I guess we should use fork/join. > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > {code:lang=java} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468929#comment-15468929 ] Apache Spark commented on SPARK-17396: -- User 'rdblue' has created a pull request for this issue: https://github.com/apache/spark/pull/14985 > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > {code:lang=java} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467956#comment-15467956 ] Ryan Blue commented on SPARK-17396: --- I'll put together a patch for this with a shared executor service. Although ForkJoinPool isn't to blame, from what I've read about it we probably should be using a different implementation. We don't need the fork/join task pattern and it would be much better to have reliable (and documented) semantics. > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > {code:lang=java} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15466879#comment-15466879 ] Sean Owen commented on SPARK-17396: --- Yeah [~rdblue] is right on the mark then. I agree, I wasn't clear on how it manages threads. A shared {{ExecutorService}}, a 'cached' version with some reasonable max number of threads like 8 should be good. > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > {code:lang=java} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15466180#comment-15466180 ] pin_zhang commented on SPARK-17396: --- "Thread-1902" daemon prio=6 tid=0x14078800 nid=0x3a6c runnable [0x38d5e000] "Thread-1901" daemon prio=6 tid=0x0c64f800 nid=0x32fc runnable [0x191ef000] "Thread-1900" daemon prio=6 tid=0x14249800 nid=0x263c runnable [0x4c73e000] "Thread-1899" daemon prio=6 tid=0x14244000 nid=0x189c runnable [0x17c7e000] "Thread-1898" daemon prio=6 tid=0x0d96a800 nid=0x3e54 runnable [0x4c5ef000] "ForkJoinPool-120-worker-1" daemon prio=6 tid=0x1407d000 nid=0x2234 waiting for monitor entry [0x4c31e000] "ForkJoinPool-120-worker-3" daemon prio=6 tid=0x13a64000 nid=0x1f0c waiting for monitor entry [0x4c0de000] "ForkJoinPool-120-worker-5" daemon prio=6 tid=0x13a75800 nid=0x1660 waiting for monitor entry [0x4241e000] "ForkJoinPool-120-worker-7" daemon prio=6 tid=0x13d6c000 nid=0x117c waiting for monitor entry [0x4bece000] "ForkJoinPool-120-worker-9" daemon prio=6 tid=0x14233800 nid=0x2a20 waiting for monitor entry [0x4bd3e000] "ForkJoinPool-120-worker-11" daemon prio=6 tid=0x1423f800 nid=0x3568 waiting for monitor entry [0x4afae000] "ForkJoinPool-120-worker-13" daemon prio=6 tid=0x1424e000 nid=0x378c waiting for monitor entry [0x4bc0e000] "ForkJoinPool-120-worker-15" daemon prio=6 tid=0x14238000 nid=0x1b8c waiting for monitor entry [0x18dfd000] "ForkJoinPool-119-worker-1" daemon prio=6 tid=0x13d74800 nid=0x29a0 waiting for monitor entry [0x4bade000] "ForkJoinPool-119-worker-3" daemon prio=6 tid=0x12cd4000 nid=0x18a0 in Object.wait() [0x4b9ae000] "ForkJoinPool-119-worker-7" daemon prio=6 tid=0x12cd3000 nid=0x15ec waiting for monitor entry [0x4b87d000] "ForkJoinPool-119-worker-5" daemon prio=6 tid=0x13bbd800 nid=0x2c24 waiting for monitor entry [0x4b76d000] "ForkJoinPool-119-worker-9" daemon prio=6 tid=0x13bc9800 nid=0x3d78 waiting for monitor entry [0x2acae000] "ForkJoinPool-119-worker-11" daemon prio=6 tid=0x0d9eb000 nid=0x3f40 waiting for monitor entry [0x4b57e000] "ForkJoinPool-119-worker-13" daemon prio=6 tid=0x0d9e4800 nid=0x286c waiting for monitor entry [0x4b40e000] "ForkJoinPool-119-worker-15" daemon prio=6 tid=0x0d9e9000 nid=0x2304 in Object.wait() [0x194de000] "ForkJoinPool-118-worker-1" daemon prio=6 tid=0x14077000 nid=0x3a50 runnable [0x393dd000] "ForkJoinPool-118-worker-3" daemon prio=6 tid=0x1407a000 nid=0x1dc0 runnable [0x2331d000] "ForkJoinPool-118-worker-5" daemon prio=6 tid=0x0d2f9000 nid=0x2990 runnable [0x1b6fd000] "ForkJoinPool-118-worker-7" daemon prio=6 tid=0x0d2df800 nid=0x3bb4 runnable [0x4a9dd000] "ForkJoinPool-118-worker-9" daemon prio=6 tid=0x0d2f7800 nid=0x37e4 waiting for monitor entry [0x2bf5e000] "ForkJoinPool-118-worker-11" daemon prio=6 tid=0x12648000 nid=0x2878 runnable [0x2b26d000] "ForkJoinPool-118-worker-13" daemon prio=6 tid=0x12646000 nid=0x4cc waiting for monitor entry [0x183de000] "ForkJoinPool-118-worker-15" daemon prio=6 tid=0x12647800 nid=0x30c8 waiting for monitor entry [0x2bd3d000] "ForkJoinPool-117-worker-5" daemon prio=6 tid=0x12b5c800 nid=0x3510 waiting for monitor entry [0x4b2be000] "ForkJoinPool-117-worker-1" daemon prio=6 tid=0x12b5d000 nid=0x36b8 waiting for monitor entry [0x4b11e000] "ForkJoinPool-117-worker-3" daemon prio=6 tid=0x12eac800 nid=0x32d4 in Object.wait() [0x4acae000] "ForkJoinPool-117-worker-7" daemon prio=6 tid=0x12ea9800 nid=0x16c4 waiting for monitor entry [0x4ab1e000] "ForkJoinPool-117-worker-9" daemon prio=6 tid=0x12e9b000 nid=0x1e44 waiting for monitor entry [0x2162e000] "ForkJoinPool-117-worker-11" daemon prio=6 tid=0x13bcc000 nid=0x37f4 waiting for monitor entry [0x40dee000] "ForkJoinPool-117-worker-13" daemon prio=6 tid=0x13bcb000 nid=0x361c in Object.wait() [0x35dbe000] "ForkJoinPool-117-worker-15" daemon prio=6 tid=0x13bca800 nid=0x3344 in Object.wait() [0x2c0ce000] "ForkJoinPool-116-worker-1" daemon prio=6 tid=0x13bc9000 nid=0x3a34 runnable [0x4867d000] "ForkJoinPool-116-worker-3" daemon prio=6 tid=0x13bc8000 nid=0x1c10 in Object.wait() [0x4a8be000] "ForkJoinPool-116-worker-7" daemon prio=6 tid=0x13bc7800 nid=0x2910 waiting on condition [0x45e7f000] "ForkJoinPool-116-worker-5" daemon prio=6 tid=0x13bc6800 nid=0x3b1c waiting for monitor entry
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465375#comment-15465375 ] Ryan Blue commented on SPARK-17396: --- I'm not sure that the ForkJoinPool is to blame. Each partition in a Hive table becomes a partition RDD in the union. Each UnionRDD creates a new ForkJoinPool (parallelism=8) to list the files in a partition in parallel. I think it is more likely that the problem is that we don't use a shared ForkJoinPool, but instead create a new one for each RDD that doesn't get cleaned up until the UnionRDD is cleaned. Even if the ForkJoinPool can't reuse threads and we're getting more than 8, it wouldn't grow to thousands if we weren't creating so many pools. But, I agree that we should consider a different executor service. I've been reading up on ForkJoinPool and it looks like it's not a great implementation; just trying to determine the maximum number of threads it will use was painfully undocumented. [~pin_zhang], can you post a list of the thread names? We can use the names to determine how many threads there are in each pool. That should tell us whether we should use a different executor service in addition to using a shared pool. > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > {code:lang=java} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464627#comment-15464627 ] Sean Owen commented on SPARK-17396: --- [~rdblue] what do you think of this -- I think there's a good point here that ForkJoinPool won't limit the number of threads it uses? if that's true then this would probably need to switch to a normal Java executor service that can cap threads. > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464602#comment-15464602 ] pin_zhang commented on SPARK-17396: --- 1.Thousand of thread created look like ForkJoinPool-20-worker-9" #329 daemon prio=5 os_prio=0 tid=0x0ac87000 nid=0x3d43 waiting on condition [0x5069f000] "ForkJoinPool-19-worker-3" #324 daemon prio=5 os_prio=0 tid=0x0ae6 nid=0x3c2a waiting on condition [0x5039c000] 2.The thread should be created by UnionRDD > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17396) Threads number keep increasing when query on external CSV partitioned table
[ https://issues.apache.org/jira/browse/SPARK-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464525#comment-15464525 ] Sean Owen commented on SPARK-17396: --- Can you be more specific? what pool, and what problem do you observe? what are the threads doing? > Threads number keep increasing when query on external CSV partitioned table > --- > > Key: SPARK-17396 > URL: https://issues.apache.org/jira/browse/SPARK-17396 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0 >Reporter: pin_zhang > > 1. Create a external partitioned table row format CSV > 2. Add 16 partitions to the table > 3. Run SQL "select count(*) from test_csv" > 4. ForkJoinThread number keep increasing > This happend when table partitions number greater than 10. > 5. Test Code > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.sql.hive.HiveContext > object Bugs { > def main(args: Array[String]): Unit = { > val location = "file:///g:/home/test/csv" > val create = s"""CREATE EXTERNAL TABLE test_csv > (ID string, SEQ string ) > PARTITIONED BY(index int) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' > LOCATION "${location}" > """ > val add_part = s""" > ALTER TABLE test_csv ADD > PARTITION (index=1)LOCATION '${location}/index=1' > PARTITION (index=2)LOCATION '${location}/index=2' > PARTITION (index=3)LOCATION '${location}/index=3' > PARTITION (index=4)LOCATION '${location}/index=4' > PARTITION (index=5)LOCATION '${location}/index=5' > PARTITION (index=6)LOCATION '${location}/index=6' > PARTITION (index=7)LOCATION '${location}/index=7' > PARTITION (index=8)LOCATION '${location}/index=8' > PARTITION (index=9)LOCATION '${location}/index=9' > PARTITION (index=10)LOCATION '${location}/index=10' > PARTITION (index=11)LOCATION '${location}/index=11' > PARTITION (index=12)LOCATION '${location}/index=12' > PARTITION (index=13)LOCATION '${location}/index=13' > PARTITION (index=14)LOCATION '${location}/index=14' > PARTITION (index=15)LOCATION '${location}/index=15' > PARTITION (index=16)LOCATION '${location}/index=16' > """ > val conf = new SparkConf().setAppName("scala").setMaster("local[2]") > conf.set("spark.sql.warehouse.dir", "file:///g:/home/warehouse") > val ctx = new SparkContext(conf) > val hctx = new HiveContext(ctx) > hctx.sql(create) > hctx.sql(add_part) > for (i <- 1 to 6) { > new Query(hctx).start() > } > } > class Query(htcx: HiveContext) extends Thread { > setName("Query-Thread") > override def run = { > while (true) { > htcx.sql("select count(*) from test_csv").show() > Thread.sleep(100) > } > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org