Spark SQL reads all leaf directories on a partitioned Hive table
Hi, I am using Spark SQL 2.3.3 to read a hive table which is partitioned by day, hour, platform, request_status and is_sampled. The underlying data is in parquet format on HDFS. Here is the SQL query to read just *one partition*. ``` spark.sql(""" SELECT rtb_platform_id, SUM(e_cpm) FROM raw_logs.fact_request WHERE day = '2019-08-01' AND hour = '00' AND platform = 'US' AND request_status = '3' AND is_sampled = 1 GROUP BY rtb_platform_id """).show ``` However, from the Spark web UI, the stage description shows: ``` Listing leaf files and directories for 201616 paths: viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0, ... ``` It seems the job is reading all of the partitions of the table and the job takes too long for just one partition. One workaround is using `spark.read.parquet` API to read parquet files directly. Spark has partition-awareness for partitioned directories. But still, I would like to know if there is a way to leverage partition-awareness via Hive by using `spark.sql` API? Any help is highly appreciated! Thank you. -- Hao Ren
Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
Yes, it is. You can define a udf like that. Basically, it's a udf Int => Int which is a closure contains a non serializable object. The latter should cause Task not serializable exception. Hao On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <bablo...@gmail.com> wrote: > Hello Hao Ren, > > Doesn't the code... > > val add = udf { > (a: Int) => a + notSer.value > } > Mean UDF function that Int => Int ? > > Thanks, > Muthu > > On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote: > >> I am playing with spark 2.0 >> What I tried to test is: >> >> Create a UDF in which there is a non serializable object. >> What I expected is when this UDF is called during materializing the >> dataFrame where the UDF is used in "select", an task non serializable >> exception should be thrown. >> It depends also which "action" is called on that dataframe. >> >> Here is the code for reproducing the pb: >> >> >> object DataFrameSerDeTest extends App { >> >> class A(val value: Int) // It is not serializable >> >> def run() = { >> val spark = SparkSession >> .builder() >> .appName("DataFrameSerDeTest") >> .master("local[*]") >> .getOrCreate() >> >> import org.apache.spark.sql.functions.udf >> import spark.sqlContext.implicits._ >> >> val notSer = new A(2) >> val add = udf { >> (a: Int) => a + notSer.value >> } >> val df = spark.createDataFrame(Seq( >> (1, 2), >> (2, 2), >> (3, 2), >> (4, 2) >> )).toDF("key", "value") >> .select($"key", add($"value").as("added")) >> >> df.show() // *It should not work because the udf contains a >> non-serializable object, but it works* >> >> df.filter($"key" === 2).show() // *It does not work as expected >> (org.apache.spark.SparkException: Task not serializable)* >> } >> >> run() >> } >> >> >> Also, I tried collect(), count(), first(), limit(). All of them worked >> without non-serializable exceptions. >> It seems only filter() throws the exception. (feature or bug ?) >> >> Any ideas ? Or I just messed things up ? >> Any help is highly appreciated. >> >> -- >> Hao Ren >> >> Data Engineer @ leboncoin >> >> Paris, France >> > > -- Hao Ren Data Engineer @ leboncoin Paris, France
[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected
I am playing with spark 2.0 What I tried to test is: Create a UDF in which there is a non serializable object. What I expected is when this UDF is called during materializing the dataFrame where the UDF is used in "select", an task non serializable exception should be thrown. It depends also which "action" is called on that dataframe. Here is the code for reproducing the pb: object DataFrameSerDeTest extends App { class A(val value: Int) // It is not serializable def run() = { val spark = SparkSession .builder() .appName("DataFrameSerDeTest") .master("local[*]") .getOrCreate() import org.apache.spark.sql.functions.udf import spark.sqlContext.implicits._ val notSer = new A(2) val add = udf { (a: Int) => a + notSer.value } val df = spark.createDataFrame(Seq( (1, 2), (2, 2), (3, 2), (4, 2) )).toDF("key", "value") .select($"key", add($"value").as("added")) df.show() // *It should not work because the udf contains a non-serializable object, but it works* df.filter($"key" === 2).show() // *It does not work as expected (org.apache.spark.SparkException: Task not serializable)* } run() } Also, I tried collect(), count(), first(), limit(). All of them worked without non-serializable exceptions. It seems only filter() throws the exception. (feature or bug ?) Any ideas ? Or I just messed things up ? Any help is highly appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France
[MLlib] Term Frequency in TF-IDF seems incorrect
When computing term frequency, we can use either HashTF or CountVectorizer feature extractors. However, both of them just use the number of times that a term appears in a document. It is not a true frequency. Acutally, it should be divided by the length of the document. Is this a wanted feature ? -- Hao Ren Data Engineer @ leboncoin Paris, France
SparkSQL can not extract values from UDT (like VectorUDT)
Hi, Consider the following code using spark.ml to get the probability column on a data set: model.transform(dataSet) .selectExpr("probability.values") .printSchema() Note that "probability" is `vector` type which is a UDT with the following implementation. class VectorUDT extends UserDefinedType[Vector] { override def sqlType: StructType = { // type: 0 = sparse, 1 = dense // We only use "values" for dense vectors, and "size", "indices", and "values" for sparse // vectors. The "values" field is nullable because we might want to add binary vectors later, // which uses "size" and "indices", but not "values". StructType(Seq( StructField("type", ByteType, nullable = false), StructField("size", IntegerType, nullable = true), StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) } //... } `values` is one of its attribute. However, it can not be extracted. The first code snippet results in an exception of complexTypeExtractors: org.apache.spark.sql.AnalysisException: Can't extract value from probability#743; at ... at ... at ... ... Here is the code: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala#L49 It seems that the pattern matching does not take UDT into consideration. Is this an intended feature? If not, I would like to create a PR to fix it. -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: [MLlib] BinaryLogisticRegressionSummary on test set
Thank you for the reply. I have created a jira issue and pinged mengxr. Here is the link: https://issues.apache.org/jira/browse/SPARK-10691 I did not find jkbradley on jira. I saw he is on github. BTW, should I create a pull request on removing the private modifier for further discussion ? Thx. On Thu, Sep 17, 2015 at 6:44 PM, Feynman Liang <fli...@databricks.com> wrote: > We have kept that private because we need to decide on a name for the > method which evaluates on a test set (see the TODO comment > <https://github.com/apache/spark/pull/7099/files#diff-668c79317c51f40df870d3404d8a731fR272>); > perhaps you could push for this to happen by creating a Jira and pinging > jkbradley and mengxr. Thanks! > > On Thu, Sep 17, 2015 at 8:07 AM, Hao Ren <inv...@gmail.com> wrote: > >> Working on spark.ml.classification.LogisticRegression.scala (spark 1.5), >> >> It might be useful if we can create a summary for any given dataset, not >> just training set. >> Actually, BinaryLogisticRegressionTrainingSummary is only created when >> model is computed based on training set. >> As usual, we need to summary test set to know about the model performance. >> However, we can not create our own BinaryLogisticRegressionSummary for >> other date set (of type DataFrame), because the Summary class is "private" >> in classification package. >> >> Would it be better to remove the "private" access modifier and allow the >> following code on user side: >> >> val lr = new LogisticRegression() >> >> val model = lr.fit(trainingSet) >> >> val binarySummary = >> new BinaryLogisticRegressionSummary( >> model.transform(testSet), >> lr.probabilityCol, >> lr.labelCol >> ) >> >> binarySummary.roc >> >> >> Thus, we can use the model to summary any data set we want. >> >> If there is a way to summary test set, please let me know. I have browsed >> LogisticRegression.scala, but failed to find one. >> >> Thx. >> >> -- >> Hao Ren >> >> Data Engineer @ leboncoin >> >> Paris, France >> > > -- Hao Ren Data Engineer @ leboncoin Paris, France
S3 Read / Write makes executors deadlocked
Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Hao Ren Data Engineer @ leboncoin Paris, France
Re: S3 Read / Write makes executors deadlocked
I have tested on another pc which has 8 CPU cores. But it hangs when defaultParallelismLevel 4, e.g. sparkConf.setMaster(local[*]) local[1] ~ local[3] work well. 4 is the mysterious boundary. It seems that I am not the only one encountered this problem: https://issues.apache.org/jira/browse/SPARK-8898 Here is Sean's answer for the jira above: this is a jets3t problem. You will have to manage to update it in your build or get EC2 + Hadoop 2 to work, which I think can be done. At least, this is just a subset of EC2 should support Hadoop 2 and/or that the EC2 support should move out of Spark anyway. I don't know there's another action to take in Spark. But I just use sbt the get the published spark 1.4, and it does not work on my local PC, not EC2. Seriously, I do think something should be done for Spark, because s3 read/write is quite a common use case. Any help on this issue is highly appreciated. If you need more info, checkout the jira I created: https://issues.apache.org/jira/browse/SPARK-8869 On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren inv...@gmail.com wrote: Given the following code which just reads from s3, then saves files to s3 val inputFileName: String = s3n://input/file/path val outputFileName: String = s3n://output/file/path val conf = new SparkConf().setAppName(this.getClass.getName).setMaster(local[4]) val sparkContext = new SparkContext(conf) // Problems here: executors thread locked sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName) // But this one works sparkContext.textFile(inputFileName).count() It blocks without showing any exceptions or errors. jstack shows that all executors are locked. The thread dump is in end of this post. I am using spark-1.4.0 on my PC which has 4 CPU cores. There are 21 parquet files in the input directory, 500KB / file. In addition, if we change the last action to a non IO bounded one, for example, count(). It works. It seems that S3 read and write in the same stage makes executors deadlocked. I encountered the same problem when using DataFrame load/save operations, jira created: https://issues.apache.org/jira/browse/SPARK-8869 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518) - *locked* 0xe56745b8 (a org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) at org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) at org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90