Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-07 Thread Hao Ren
Hi,
I am using Spark SQL 2.3.3 to read a hive table which is partitioned by
day, hour, platform, request_status and is_sampled. The underlying data is
in parquet format on HDFS.
Here is the SQL query to read just *one partition*.

```
spark.sql("""
SELECT rtb_platform_id, SUM(e_cpm)
FROM raw_logs.fact_request
WHERE day = '2019-08-01'
AND hour = '00'
AND platform = 'US'
AND request_status = '3'
AND is_sampled = 1
GROUP BY rtb_platform_id
""").show
```

However, from the Spark web UI, the stage description shows:

```
Listing leaf files and directories for 201616 paths:
viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0,
...
```

It seems the job is reading all of the partitions of the table and the job
takes too long for just one partition. One workaround is using
`spark.read.parquet` API to read parquet files directly. Spark has
partition-awareness for partitioned directories.

But still, I would like to know if there is a way to leverage
partition-awareness via Hive by using `spark.sql` API?

Any help is highly appreciated!

Thank you.

-- 
Hao Ren


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non
serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <bablo...@gmail.com> wrote:

> Hello Hao Ren,
>
> Doesn't the code...
>
> val add = udf {
>   (a: Int) => a + notSer.value
> }
> Mean UDF function that Int => Int ?
>
> Thanks,
> Muthu
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote:
>
>> I am playing with spark 2.0
>> What I tried to test is:
>>
>> Create a UDF in which there is a non serializable object.
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>> It depends also which "action" is called on that dataframe.
>>
>> Here is the code for reproducing the pb:
>>
>> 
>> object DataFrameSerDeTest extends App {
>>
>>   class A(val value: Int) // It is not serializable
>>
>>   def run() = {
>> val spark = SparkSession
>>   .builder()
>>   .appName("DataFrameSerDeTest")
>>   .master("local[*]")
>>   .getOrCreate()
>>
>> import org.apache.spark.sql.functions.udf
>> import spark.sqlContext.implicits._
>>
>> val notSer = new A(2)
>> val add = udf {
>>   (a: Int) => a + notSer.value
>> }
>> val df = spark.createDataFrame(Seq(
>>   (1, 2),
>>   (2, 2),
>>   (3, 2),
>>   (4, 2)
>> )).toDF("key", "value")
>>   .select($"key", add($"value").as("added"))
>>
>> df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>> df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>   }
>>
>>   run()
>> }
>> 
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>> Any ideas ? Or I just messed things up ?
>> Any help is highly appreciated.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-07 Thread Hao Ren
I am playing with spark 2.0
What I tried to test is:

Create a UDF in which there is a non serializable object.
What I expected is when this UDF is called during materializing the
dataFrame where the UDF is used in "select", an task non serializable
exception should be thrown.
It depends also which "action" is called on that dataframe.

Here is the code for reproducing the pb:


object DataFrameSerDeTest extends App {

  class A(val value: Int) // It is not serializable

  def run() = {
val spark = SparkSession
  .builder()
  .appName("DataFrameSerDeTest")
  .master("local[*]")
  .getOrCreate()

import org.apache.spark.sql.functions.udf
import spark.sqlContext.implicits._

val notSer = new A(2)
val add = udf {
  (a: Int) => a + notSer.value
}
val df = spark.createDataFrame(Seq(
  (1, 2),
  (2, 2),
  (3, 2),
  (4, 2)
)).toDF("key", "value")
  .select($"key", add($"value").as("added"))

df.show() // *It should not work because the udf contains a
non-serializable object, but it works*

df.filter($"key" === 2).show() // *It does not work as expected
(org.apache.spark.SparkException: Task not serializable)*
  }

  run()
}


Also, I tried collect(), count(), first(), limit(). All of them worked
without non-serializable exceptions.
It seems only filter() throws the exception. (feature or bug ?)

Any ideas ? Or I just messed things up ?
Any help is highly appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[MLlib] Term Frequency in TF-IDF seems incorrect

2016-08-01 Thread Hao Ren
When computing term frequency, we can use either HashTF or CountVectorizer
feature extractors.
However, both of them just use the number of times that a term appears in a
document.
It is not a true frequency. Acutally, it should be divided by the length of
the document.

Is this a wanted feature ?

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


SparkSQL can not extract values from UDT (like VectorUDT)

2015-10-12 Thread Hao Ren
Hi,

Consider the following code using spark.ml to get the probability column on
a data set:

model.transform(dataSet)
.selectExpr("probability.values")
.printSchema()

 Note that "probability" is `vector` type which is a UDT with the following
implementation.

class VectorUDT extends UserDefinedType[Vector] {

  override def sqlType: StructType = {
// type: 0 = sparse, 1 = dense
// We only use "values" for dense vectors, and "size", "indices",
and "values" for sparse
// vectors. The "values" field is nullable because we might want
to add binary vectors later,
// which uses "size" and "indices", but not "values".
StructType(Seq(
  StructField("type", ByteType, nullable = false),
  StructField("size", IntegerType, nullable = true),
  StructField("indices", ArrayType(IntegerType, containsNull =
false), nullable = true),
  StructField("values", ArrayType(DoubleType, containsNull =
false), nullable = true)))
  }

  //...

}


`values` is one of its attribute. However, it can not be extracted.

The first code snippet results in an exception of  complexTypeExtractors:

org.apache.spark.sql.AnalysisException: Can't extract value from
probability#743;
  at ...
  at ...
  at ...
...

Here is the code:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala#L49

It seems that the pattern matching does not take UDT into consideration.

Is this an intended feature? If not, I would like to create a PR to fix it.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: [MLlib] BinaryLogisticRegressionSummary on test set

2015-09-18 Thread Hao Ren
Thank you for the reply.

I have created a jira issue and pinged mengxr.

Here is the link: https://issues.apache.org/jira/browse/SPARK-10691

I did not find jkbradley on jira. I saw he is on github.

BTW, should I create a pull request on removing the private modifier for
further discussion ?

Thx.

On Thu, Sep 17, 2015 at 6:44 PM, Feynman Liang <fli...@databricks.com>
wrote:

> We have kept that private because we need to decide on a name for the
> method which evaluates on a test set (see the TODO comment
> <https://github.com/apache/spark/pull/7099/files#diff-668c79317c51f40df870d3404d8a731fR272>);
> perhaps you could push for this to happen by creating a Jira and pinging
> jkbradley and mengxr. Thanks!
>
> On Thu, Sep 17, 2015 at 8:07 AM, Hao Ren <inv...@gmail.com> wrote:
>
>> Working on spark.ml.classification.LogisticRegression.scala (spark 1.5),
>>
>> It might be useful if we can create a summary for any given dataset, not
>> just training set.
>> Actually, BinaryLogisticRegressionTrainingSummary  is only created when
>> model is computed based on training set.
>> As usual, we need to summary test set to know about the model performance.
>> However, we can not create our own BinaryLogisticRegressionSummary for
>> other date set (of type DataFrame), because the Summary class is "private"
>> in classification package.
>>
>> Would it be better to remove the "private" access modifier and allow the
>> following code on user side:
>>
>> val lr = new LogisticRegression()
>>
>> val model = lr.fit(trainingSet)
>>
>> val binarySummary =
>>   new BinaryLogisticRegressionSummary(
>> model.transform(testSet),
>> lr.probabilityCol,
>> lr.labelCol
>>   )
>>
>> binarySummary.roc
>>
>>
>> Thus, we can use the model to summary any data set we want.
>>
>> If there is a way to summary test set, please let me know. I have browsed
>> LogisticRegression.scala, but failed to find one.
>>
>> Thx.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
Given the following code which just reads from s3, then saves files to s3



val inputFileName: String = s3n://input/file/path
val outputFileName: String = s3n://output/file/path
val conf = new 
SparkConf().setAppName(this.getClass.getName).setMaster(local[4])

val sparkContext = new SparkContext(conf)

// Problems here: executors thread locked
sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
// But this one works
sparkContext.textFile(inputFileName).count()


It blocks without showing any exceptions or errors. jstack shows that
all executors are locked. The thread dump is in end of this post.

I am using spark-1.4.0 on my PC which has 4 CPU cores.
There are 21 parquet files in the input directory, 500KB / file.

In addition, if we change the last action to a non IO bounded one, for
example, count(). It works.
It seems that S3 read and write in the same stage makes executors deadlocked.

I encountered the same problem when using DataFrame load/save
operations, jira created:
https://issues.apache.org/jira/browse/SPARK-8869

Executor task launch worker-3 #69 daemon prio=5 os_prio=0
tid=0x7f7bd4036800 nid=0x1296 in Object.wait()
[0x7f7c1099a000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
- *locked* 0xe56745b8 (a
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416)
at 
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown 
Source)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
I have tested on another pc which has 8 CPU cores.
But it hangs when defaultParallelismLevel  4, e.g.
sparkConf.setMaster(local[*])
local[1] ~ local[3] work well.

4 is the mysterious boundary.

It seems that I am not the only one encountered this problem:
https://issues.apache.org/jira/browse/SPARK-8898

Here is Sean's answer for the jira above:
this is a jets3t problem. You will have to manage to update it in your
build or get EC2 + Hadoop 2 to work, which I think can be done. At least,
this is just a subset of EC2 should support Hadoop 2 and/or that the EC2
support should move out of Spark anyway. I don't know there's another
action to take in Spark.

But I just use sbt the get the published spark 1.4, and it does not work on
my local PC, not EC2.
Seriously, I do think something should be done for Spark, because s3
read/write is quite a common use case.

Any help on this issue is highly appreciated.
If you need more info, checkout the jira I created:
https://issues.apache.org/jira/browse/SPARK-8869

On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren inv...@gmail.com wrote:

 Given the following code which just reads from s3, then saves files to s3

 

 val inputFileName: String = s3n://input/file/path
 val outputFileName: String = s3n://output/file/path
 val conf = new 
 SparkConf().setAppName(this.getClass.getName).setMaster(local[4])

 val sparkContext = new SparkContext(conf)

 // Problems here: executors thread locked
 sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
 // But this one works
 sparkContext.textFile(inputFileName).count()
 

 It blocks without showing any exceptions or errors. jstack shows that all 
 executors are locked. The thread dump is in end of this post.

 I am using spark-1.4.0 on my PC which has 4 CPU cores.
 There are 21 parquet files in the input directory, 500KB / file.

 In addition, if we change the last action to a non IO bounded one, for 
 example, count(). It works.
 It seems that S3 read and write in the same stage makes executors deadlocked.

 I encountered the same problem when using DataFrame load/save operations, 
 jira created: https://issues.apache.org/jira/browse/SPARK-8869

 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 
 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   at 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
   - *locked* 0xe56745b8 (a 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
   at 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416)
   at 
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
   at 
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
   at 
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
   at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
   at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
   at 
 org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
   at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
   at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown 
 Source)
   at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
   at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
   at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
   at 
 org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
   at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90