Re: Spark ANSI SQL Support

2017-01-17 Thread Andrew Ash
Rishabh,

Have you come across any ANSI SQL queries that Spark SQL didn't support?
I'd be interested to hear if you have.

Andrew

On Tue, Jan 17, 2017 at 8:14 PM, Deepak Sharma 
wrote:

> From spark documentation page:
> Spark SQL can now run all 99 TPC-DS queries.
>
> On Jan 18, 2017 9:39 AM, "Rishabh Bhardwaj"  wrote:
>
>> Hi All,
>>
>> Does Spark 2.0 Sql support full ANSI SQL query standards?
>>
>> Thanks,
>> Rishabh.
>>
>


Middleware-wrappers for Spark

2017-01-17 Thread Rick Moritz
Hi List,

I've been following several projects with quite some interest over the past
few years, and I've continued to wonder, why they're not moving towards a
degree of being supported by mainstream Spark-distributions, and more
frequently mentioned when it comes to enterprise adoption of Spark.

The list of such "middleware" components, that each sounded like it would
mostly succeed the classic APIs to Spark (at least in their respective
domains), that I've com across, are the following [In order of appearance]:

* Spark JobServer - This was one of the first examples that I came across,
it was quite exciting at the time, but I've not heard much of it since. I
assume the focus was on stabilizing the code base.

* Oryx2 - This was more focused on a particular issue, and looked to be a
very nice framework for deploying real-time analytics --- but again, no
real traction. In fact, I've heard of PoCs being done by/for Cloudera, to
demo Lambda-Architectures with Spark, and this was not showcased.

* Livy - Although Livy still appears to live, I'm not really seeing the
progress, that I anticipated after first hearing about it at the 2015 Spark
Summit Europe. Maybe it's because the documentation isn't quite there yet,
maybe it's because features are missing -- somehow from my last look at it,
it's not enterprise-ready quite yet, while offering a feature-set that
should be driving enterprise adoption.

* Mist - Just discovered it today, thinking, "great, ANOTHER middleware"
and prompting this post. It looks quite fully featured, but can it succeed?
On the plus side, it's linked to a small, focused business, on the down
side it's linked to a small, focused business. Positive, since that drives
development along nicely; negative, since it inhibits adoption in the
enterprise space.


Now, with that said - why did these products not gain bigger traction? Is
it because Spark isn't quite ready yet? Is it because of a missed marketing
opportunity?

And on another note: Should Spark integrate such a wrapper "by default"?
It's a step further on from the SparkSQL Thrift interface, towards offering
not just programming API's, but service-APIs. Considering that there are so
many different interpretations of how this should be solved, bundling the
effort into a default-implementation could be beneficial. On the other
hand, feature creep of this magnitude probably isn't desirable.

I'd hope to hear some community opinions, in particular from
developers/users of these or other similar projects. If I overlooked your
similar project: Please pitch it -- I think this part of the ecosystem is
shaping up to be quite exciting.

Also, I'm looking at this with my enterprise-glasses on: So fine-grained
user authorization and authentication features are very important, as are
consistency and resiliency features. Since long-running interactive
Spark-jobs are still a mixed bag stability-wise, this layer of middleware
should provide a necessary buffer between crashes of the driver program,
and serving results.
Ecosystem support is also a must - why aren't there Tableau connectors for
(some of) these APIs? [Because they're too obscure...]

A closing note: This could of course just be the open-source/enterprise
egg/hen issue: Open Source projects without large scale vendor support
aren't interesting for the enterprise, and enterprise features aren't
interesting for the non-enterprise developer. And worse, I wonder how man
in-house custom solutions/extensions of these projects exist in the wild,
because enterprise developers aren't usually allowed to share code back
into open source projects.

Thanks for putting up with this post this far,

Best

Rick


Re: Spark/Parquet/Statistics question

2017-01-17 Thread Michael Segel
Hi, 
Lexicographically speaking, Min/Max should work because String(s)  support a 
comparator operator.  So anything which supports an equality test (<,>, <= , >= 
, == …) can also support min and max functions as well. 

I guess the question is if Spark does support this, and if not, why? 
Yes, it makes sense. 



> On Jan 17, 2017, at 9:17 AM, Jörn Franke  wrote:
> 
> Hallo,
> 
> I am not sure what you mean by min/max for strings. I do not know if this 
> makes sense. What the ORC format has is bloom filters for strings etc. - are 
> you referring to this? 
> 
> In order to apply min/max filters Spark needs to read the meta data of the 
> file. If the filter is applied or not - this you can see from the number of 
> bytes read.
> 
> 
> Best regards
> 
>> On 17 Jan 2017, at 15:28, djiang  wrote:
>> 
>> Hi, 
>> 
>> I have been looking into how Spark stores statistics (min/max) in Parquet as
>> well as how it uses the info for query optimization.
>> I have got a few questions.
>> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
>> with a long type and a string type column.
>> They are sorted by different columns, though.
>> 
>> scala> spark.sql("select id, cast(id as string) text from
>> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
>> scala> spark.sql("select id, cast(id as string) text from
>> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
>> 
>> I added some code to parquet-tools to print out stats and examine the
>> generated parquet files:
>> 
>> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
>> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>>  
>> file:   
>> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>>  
>> creator: parquet-mr version 1.8.1 (build
>> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
>> extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>>  
>> 
>> file schema: spark_schema 
>> 
>> id:  REQUIRED INT64 R:0 D:0
>> text:REQUIRED BINARY O:UTF8 R:0 D:0
>> 
>> row group 1: RC:5 TS:133 OFFSET:4 
>> 
>> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
>> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
>> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
>> ENC:PLAIN,BIT_PACKED
>> 
>> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
>> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>>  
>> file:   
>> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>>  
>> creator: parquet-mr version 1.8.1 (build
>> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
>> extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>>  
>> 
>> file schema: spark_schema 
>> 
>> id:  REQUIRED INT64 R:0 D:0
>> text:REQUIRED BINARY O:UTF8 R:0 D:0
>> 
>> row group 1: RC:5 TS:140 OFFSET:4 
>> 
>> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
>> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
>> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
>> ENC:PLAIN,BIT_PACKED
>> 
>> So the question is why is Spark, particularly, 2.1.0, only generate min/max
>> for numeric columns, but not strings(BINARY) fields, even if the string
>> field is included in the sort? Maybe I missed a configuraiton?
>> 
>> The second issue, is how can I confirm Spark is utilizing the min/max?
>> scala> sc.setLogLevel("INFO")
>> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
>> id=4").show
>> I got many lines like this:
>> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
>> and(noteq(id, null), eq(id, 4))
>> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
>> file:///secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
>> range: 0-558, partition values: [empty row]
>> ...
>> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
>> and(noteq(id, null), eq(id, 4))
>> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
>> file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
>> range: 0-574, partition values: [empty row]
>> ...
>> 
>> The question is it looks like Spark is scanning every file, even if from the
>> min/max, 

Quick but probably silly question...

2017-01-17 Thread Michael Segel
Hi, 
While the parquet file is immutable and the data sets are immutable, how does 
sparkSQL handle updates or deletes? 
I mean if I read in a file using SQL in to an RDD, mutate it, eg delete a row, 
and then persist it, I now have two files. If I reread the table back in … will 
I see duplicates or not? 

The larger issue is how to handle mutable data in a multi-user / multi-tenant 
situation and using Parquet as the storage. 

Would this be the right tool? 

W.R.T ORC files, mutation is handled by Tez. 

Thanks in Advance, 

-Mike



spark main thread quit, but the Jvm of driver don't crash

2017-01-17 Thread John Fang
My spark main thread create some daemon thread. Then the spark application 
throw some exceptions, and the main thread will quit. But the jvm of driver 
don't crash, so How can i do?
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}



Re: Spark/Parquet/Statistics question

2017-01-17 Thread Jörn Franke
Hallo,

I am not sure what you mean by min/max for strings. I do not know if this makes 
sense. What the ORC format has is bloom filters for strings etc. - are you 
referring to this? 

In order to apply min/max filters Spark needs to read the meta data of the 
file. If the filter is applied or not - this you can see from the number of 
bytes read.


Best regards

> On 17 Jan 2017, at 15:28, djiang  wrote:
> 
> Hi, 
> 
> I have been looking into how Spark stores statistics (min/max) in Parquet as
> well as how it uses the info for query optimization.
> I have got a few questions.
> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
> with a long type and a string type column.
> They are sorted by different columns, though.
> 
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> 
> I added some code to parquet-tools to print out stats and examine the
> generated parquet files:
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>  
> file:   
> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>  
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>  
> 
> file schema: spark_schema 
> 
> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:133 OFFSET:4 
> 
> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>  
> file:   
> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>  
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>  
> 
> file schema: spark_schema 
> 
> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:140 OFFSET:4 
> 
> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> So the question is why is Spark, particularly, 2.1.0, only generate min/max
> for numeric columns, but not strings(BINARY) fields, even if the string
> field is included in the sort? Maybe I missed a configuraiton?
> 
> The second issue, is how can I confirm Spark is utilizing the min/max?
> scala> sc.setLogLevel("INFO")
> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
> id=4").show
> I got many lines like this:
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> file:///secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> range: 0-558, partition values: [empty row]
> ...
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> range: 0-574, partition values: [empty row]
> ...
> 
> The question is it looks like Spark is scanning every file, even if from the
> min/max, Spark should be able to determine only part-0 has the relevant
> data. Or maybe I read it wrong, that Spark is skipping the files? Maybe
> Spark can only use partition value for data skipping?
> 
> Thanks,
> 
> Dong
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Parquet-Statistics-question-tp28312.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 

Struct as parameter

2017-01-17 Thread Tomas Carini
Hi all.
I have the following schema that I need to filter with spark sql

|-- msg_id: string (nullable = true)
|-- country: string (nullable = true)
|-- sent_ts: timestamp (nullable = true)
|-- marks: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- namespace: string (nullable = true)
|||-- name: string (nullable = true)
|||-- value: integer (nullable = true)


I'd like to write something like
sqlContext.sql("select * from df where array_contains(tags,
$thestructparameter)").count
But I was not able to write it.

What I'm doing rith now is to create a df with one row an select if from
there. So the element matching in the array works Ok, but I was not able to
write a literal or construct a variable to pass to the sql sentence.
Any help would be very appreciatted.
TIA.
Tomas.

case class Mark(namespace: String, name: String, value: Option[Int])
case class Marks(tag: Mark)
val mark0 = sc.parallelize(Seq(Marks(Mark("a-mark", "0", null.toDF
mark0.createOrReplaceTempView("mark0")
val mark1 = sc.parallelize(Seq(Marks(Mark("a-mark", "1", null.toDF
mark1.createOrReplaceTempView("mark1")
def withmarks = sqlContext.sql("select df.*, 0 mark from df where
array_contains(marks, (select * from mark0)) union all select df.*, 1
mark from df where array_contains(marks, (select * from mark1))")


spark main thread quit, but the driver don't crash at standalone cluster

2017-01-17 Thread John Fang
My spark main thread create some daemon threads which maybe timer thread. Then 
the spark application throw some exceptions, and the main thread will quit. But 
the jvm of driver don't crash for standalone cluster. Of course the question 
don't happen at yarn cluster. Because the application master will monitor the 
main thread of applicaiton, but the stanalone cluster can't. for example:val 
sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))

//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
  new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
  })

scheduledExecutorService.scheduleAtFixedRate(
  new Runnable() {
def run() {
  try {
System.out.println("runable")
  } catch {
case e: Exception => {
  System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
}
  }
}
  }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)

Thread.sleep(1005)


val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 
10)
wordCounts.foreachRDD{rdd =>
  rdd.collect().foreach(println)
  throw new RuntimeException  //exception
}
ssc.start()
try {
  ssc.awaitTermination()
} catch {
  case e: Exception => {
System.out.println("end!")
throw e
  }
}




Re: Quick but probably silly question...

2017-01-17 Thread Jörn Franke
You run compaction, i.e. save the modified/deleted records in a dedicated file. 
Every now and then you compare the original and delta file and create a new 
version. When querying before compaction then you need to check in original and 
delta file. I don to think orc need tez for it , but it probably improves 
performance.

> On 17 Jan 2017, at 17:21, Michael Segel  wrote:
> 
> Hi, 
> While the parquet file is immutable and the data sets are immutable, how does 
> sparkSQL handle updates or deletes? 
> I mean if I read in a file using SQL in to an RDD, mutate it, eg delete a 
> row, and then persist it, I now have two files. If I reread the table back in 
> … will I see duplicates or not? 
> 
> The larger issue is how to handle mutable data in a multi-user / multi-tenant 
> situation and using Parquet as the storage. 
> 
> Would this be the right tool? 
> 
> W.R.T ORC files, mutation is handled by Tez. 
> 
> Thanks in Advance, 
> 
> -Mike
> 
> ТÐÐ¥FòVç7V'67&–RÖÖ–âW6W"×Vç7V'67&–7&²æ6†Ræ÷Ð
>  

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How State of mapWithState is distributed on nodes

2017-01-17 Thread manasdebashiskar
Does anyone have any answer.
How does the state distribution happen among multiple nodes.
I have seen that in "mapwithState" based workflow the streaming job simply
hangs when the node containing all states dies because of OOM.

..Manas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-State-of-mapWithState-is-distributed-on-nodes-tp27593p28313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Middleware-wrappers for Spark

2017-01-17 Thread Rick Moritz
To give a bit of a background to my post: I'm currently looking into
evaluating whether it could be possible to deploy a Spark-powered
DataWareHouse-like structure.
In particular the client is interested in evaluating Spark's in-memory
caches as "transient persistent layer". Although in theory this could be
done, I'm now looking into different avenues of how to do it properly.

The various job-servers initially appeared to be an option, until I
actually looked at the current feature-levels and progress. Much like Hive
2/LLAP, a case of "might work [for you] in around two years or so".

So now is to finding out why that's the case, and how to actually get to
the point, where these features could work in 2 years, and whether they
should work at all

On Tue, Jan 17, 2017 at 6:38 PM, Sean Owen  wrote:

> On Tue, Jan 17, 2017 at 4:49 PM Rick Moritz  wrote:
>
>> * Oryx2 - This was more focused on a particular issue, and looked to be a
>> very nice framework for deploying real-time analytics --- but again, no
>> real traction. In fact, I've heard of PoCs being done by/for Cloudera, to
>> demo Lambda-Architectures with Spark, and this was not showcased.
>>
>
> This one is not like the others IMHO (I'm mostly the author). It
> definitely doesn't serve access to Spark jobs. It's ML-focused, hence, much
> more narrowly applicable than what 'lambda' would encompass. In practice
> it's used as an application, for recommendations, only. Niche, but does
> what it does well. It isn't used as a general platform by even Cloudera.
> It's framed as a reference architecture for an app.
>


I agree, that the others are more generalized approaches, but I found Oryx2
very interesting, since it appears to provide exactly what you mention. As
a reference architecture for an app of course, it's not quite as useful. It
basically boils down to the question of "official support", which our
enterprise customers usually require.


>
>
>>
>> * Livy - Although Livy still appears to live, I'm not really seeing the
>> progress, that I anticipated after first hearing about it at the 2015 Spark
>> Summit Europe. Maybe it's because the documentation isn't quite there yet,
>> maybe it's because features are missing -- somehow from my last look at it,
>> it's not enterprise-ready quite yet, while offering a feature-set that
>> should be driving enterprise adoption.
>>
>
> This and Job Server (and about 3 other job server tools) do the mostly
> same thing. Livy is a Cloudera project that began to support a
> notebook-like tool in Hue. I think this didn't really go live because of
> grander plans that should emerge from the Sense acquisition. Livy's still
> definitely active, and I think it was created instead of adopting another
> tool at the time because it was deemed easier to build in the enterprise-y
> requirements like security from scratch. Go figure. I don't know how much
> Livy is really meant to be a general tool pushed for general consumption.
> It has existed to support CDH-related notebook tools, as I understand it,
> to date.
>

Even HDP has adopted this as a first-class citizen as an additional
middleware for Zeppelin (hopefully enabling the latter to reduce its
complexity in that department over time). So it definitely has the most
support from the variants I mentioned - but at least from a documentation
stand-point it's still very early days, and not being on the Apache
incubator might hinder adoption. Getting out from under Hue's umbrella was
a strong move though, and I will surely keep watching it - I was just
disappointed with the year-over-year progress, in particular given that it
had strong "manufacturer support".


>
>> Now, with that said - why did these products not gain bigger traction? Is
>> it because Spark isn't quite ready yet? Is it because of a missed marketing
>> opportunity?
>>
>
> I have a collection of guesses. First, you may be surprised how early it
> is for most companies to be using Spark in a basic way, let alone with
> 'middleware'.
>

I'm quite close to the "front lines" - in particular on the "enterprise
level", and I think adoption is really taking off right now. Hence my
interest in looking into abstraction layers (and maybe Hive is enough) for
Spark, which enable the different end-users to each do their thing, while
both sharing data and hiding secrets.


>
> Maybe none are all that mature? none that i know have vendor backing (Livy
> is not formally supported by Cloudera, even). Maybe the small market is
> fragmented?
>

That does absolutely appear to be the case -- of course it raises the
question, why vendor-backing is currently so hard to come by. Given the
multitude of attempts at doing the same thing, I think the requirement for
a solution in the space is pretty evident

>
> Not all of these things do quite the same thing. The space of 'apps' and
> 'middleware' is big.
>

Agreed - and that's before mentioning even more complex products, such as
Ignite, 

Re: Middleware-wrappers for Spark

2017-01-17 Thread Sean Owen
On Tue, Jan 17, 2017 at 4:49 PM Rick Moritz  wrote:

> * Oryx2 - This was more focused on a particular issue, and looked to be a
> very nice framework for deploying real-time analytics --- but again, no
> real traction. In fact, I've heard of PoCs being done by/for Cloudera, to
> demo Lambda-Architectures with Spark, and this was not showcased.
>

This one is not like the others IMHO (I'm mostly the author). It definitely
doesn't serve access to Spark jobs. It's ML-focused, hence, much more
narrowly applicable than what 'lambda' would encompass. In practice it's
used as an application, for recommendations, only. Niche, but does what it
does well. It isn't used as a general platform by even Cloudera. It's
framed as a reference architecture for an app.


>
> * Livy - Although Livy still appears to live, I'm not really seeing the
> progress, that I anticipated after first hearing about it at the 2015 Spark
> Summit Europe. Maybe it's because the documentation isn't quite there yet,
> maybe it's because features are missing -- somehow from my last look at it,
> it's not enterprise-ready quite yet, while offering a feature-set that
> should be driving enterprise adoption.
>

This and Job Server (and about 3 other job server tools) do the mostly same
thing. Livy is a Cloudera project that began to support a notebook-like
tool in Hue. I think this didn't really go live because of grander plans
that should emerge from the Sense acquisition. Livy's still definitely
active, and I think it was created instead of adopting another tool at the
time because it was deemed easier to build in the enterprise-y requirements
like security from scratch. Go figure. I don't know how much Livy is really
meant to be a general tool pushed for general consumption. It has existed
to support CDH-related notebook tools, as I understand it, to date.

(Even though I'm @cloudera.com I don't interface directly with either of
the above-mentioned teams so take my comment with a little grain of salt)


>
> * Mist - Just discovered it today, thinking, "great, ANOTHER middleware"
> and prompting this post. It looks quite fully featured, but can it succeed?
> On the plus side, it's linked to a small, focused business, on the down
> side it's linked to a small, focused business. Positive, since that drives
> development along nicely; negative, since it inhibits adoption in the
> enterprise space.
>

Less like the others, if it's a model-serving tool. It sounds like
OpenScoring in some ways. But yes does seem like it tries to expose access
to Spark jobs. I hadn't heard of it until you mentioned it.



> Now, with that said - why did these products not gain bigger traction? Is
> it because Spark isn't quite ready yet? Is it because of a missed marketing
> opportunity?
>

I have a collection of guesses. First, you may be surprised how early it is
for most companies to be using Spark in a basic way, let alone with
'middleware'.

Maybe none are all that mature? none that i know have vendor backing (Livy
is not formally supported by Cloudera, even). Maybe the small market is
fragmented?

Not all of these things do quite the same thing. The space of 'apps' and
'middleware' is big.

Not all (many?) use cases require long-running Spark jobs. That is what
these tools provide. It's not what Spark was built for. Using it this way
has rough edges. I actually think it's this, relative lack of demand.


>
> And on another note: Should Spark integrate such a wrapper "by default"?
> It's a step further on from the SparkSQL Thrift interface, towards offering
> not just programming API's, but service-APIs. Considering that there are so
> many different interpretations of how this should be solved, bundling the
> effort into a default-implementation could be beneficial. On the other
> hand, feature creep of this magnitude probably isn't desirable.
>

I don't think so, mostly because there's no strong reason to bless one and
reject the others, and because I still think this isn't something Spark was
built for. Spark is already a very large project and there has to be some
boundary where it ends and the ecosystem begins. There are dis-economies
(?) of scale for OSS projects.



> Also, I'm looking at this with my enterprise-glasses on: So fine-grained
> user authorization and authentication features are very important, as are
> consistency and resiliency features. Since long-running interactive
> Spark-jobs are still a mixed bag stability-wise, this
>

Security integration is the big issue as I understand. I don't think any of
these tools can fully guarantee resiliency and consistency in the general
case. A Spark job can have one driver only and there is no HA. Resource
managers already manage restarting failed drivers. I don't know if that's
the issue.


layer of middleware should provide a necessary buffer between crashes of
> the driver program, and serving results.
> Ecosystem support is also a must - why aren't there Tableau connectors for
> 

Re: Spark/Parquet/Statistics question

2017-01-17 Thread Dong Jiang
Hi,

Thanks for the response.
I am referring to a presentation by Ryan Blue. 
http://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide
Specifically, on page 27, clearly, you can generate min/max for BINARY. I don’t 
know, for sure, if this is generated by Spark or Pig.
In terms of filtering, is reviewing the number of bytes read, the only way to 
figure out? It is not direct evidence if file skipping is working or not. If I 
understand correctly, Spark will first read min/max value to determine which 
files can be skipped and then start a second read to get the actual data, thus 
reading from number of bytes read can also be tricky.

Thanks,

On 1/17/17, 10:17 AM, "Jörn Franke"  wrote:

Hallo,

I am not sure what you mean by min/max for strings. I do not know if this 
makes sense. What the ORC format has is bloom filters for strings etc. - are 
you referring to this? 

In order to apply min/max filters Spark needs to read the meta data of the 
file. If the filter is applied or not - this you can see from the number of 
bytes read.


Best regards

> On 17 Jan 2017, at 15:28, djiang  wrote:
> 
> Hi, 
> 
> I have been looking into how Spark stores statistics (min/max) in Parquet 
as
> well as how it uses the info for query optimization.
> I have got a few questions.
> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
> with a long type and a string type column.
> They are sorted by different columns, though.
> 
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> 
> I added some code to parquet-tools to print out stats and examine the
> generated parquet files:
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> 
/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
> file:   
> 
file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 
> 
> file schema: spark_schema 
> 

> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:133 OFFSET:4 
> 

> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> 
/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
> file:   
> 
file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 
> 
> file schema: spark_schema 
> 

> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:140 OFFSET:4 
> 

> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> So the question is why is Spark, particularly, 2.1.0, only generate 
min/max
> for numeric columns, but not strings(BINARY) fields, even if the string
> field is included in the sort? Maybe I missed a configuraiton?
> 
> The second issue, is how can I confirm Spark is utilizing the min/max?
> scala> sc.setLogLevel("INFO")
> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
> id=4").show
> I got many lines like this:
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading 

Re: TDD in Spark

2017-01-17 Thread Lars Albertsson
My advice, short version:
* Start by testing one job per test.
* Use Scalatest or a standard framework.
* Generate input datasets with Spark routines, write to local file.
* Run job with local master.
* Read output with Spark routines, validate only the fields you care
about for the test case at hand.
* Focus on building a functional regression test suite with small test
cases before testing with large input datasets. The former improves
productivity more.

Avoid:
* Test frameworks coupled to your processing technology - they will
make it difficult to switch.
* Spending much effort to small unit tests. Internal interfaces in
Spark tend to be volatile, and testing against them results in high
maintenance costs.
* Input files checked in to version control. They are difficult to
maintain. Generate input files with code instead.
* Expected output files checked in to VC. Same reason. Validate
selected fields instead.

For a longer answer, please search for my previous posts to the user
list, or watch this presentation: https://vimeo.com/192429554

Slides at 
http://www.slideshare.net/lallea/test-strategies-for-data-processing-pipelines-67244458


Regards,



Lars Albertsson
Data engineering consultant
www.mapflat.com
https://twitter.com/lalleal
+46 70 7687109
Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com


On Sun, Jan 15, 2017 at 7:14 PM, A Shaikh  wrote:
> Whats the most popular Testing approach for Spark App. I am looking
> something in the line of TDD.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: filter rows by all columns

2017-01-17 Thread Xiaomeng Wan
Thank you Hyukjin,
It works. This is what I end up doing

df.filter(_.toSeq.zipWithIndex.forall(v => v._1.toString().toDouble -
means(v._2) <= 3*staddevs(v._2))).show()


Regards,

Shawn

On 16 January 2017 at 18:30, Hyukjin Kwon  wrote:

> Hi Shawn,
>
> Could we do this as below?
>
>  for any of true
>
> scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
> df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]
>
> scala> df.filter(_.toSeq.exists(v => v == 1)).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|0.5|
> |  2|1.0|
> +---+---+
>
> ​
>
> or for all of true
>
> scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
> df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]
>
> scala> df.filter(_.toSeq.forall(v => v == 0)).show()
> +---+---+
> |  a|  b|
> +---+---+
> |  0|0.0|
> +---+---+
>
> ​
>
>
>
>
>
> 2017-01-17 7:27 GMT+09:00 Shawn Wan :
>
>> I need to filter out outliers from a dataframe by all columns. I can
>> manually list all columns like:
>>
>> df.filter(x=>math.abs(x.get(0).toString().toDouble-means(0))<=3*stddevs(0
>> ))
>>
>> .filter(x=>math.abs(x.get(1).toString().toDouble-means(1))<=3*stddevs
>> (1))
>>
>> ...
>>
>> But I want to turn it into a general function which can handle variable
>> number of columns. How could I do that? Thanks in advance!
>>
>>
>> Regards,
>>
>> Shawn
>>
>> --
>> View this message in context: filter rows by all columns
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Spark / Elasticsearch Error: Maybe ES was overloaded? How to throttle down Spark as it writes to ES

2017-01-17 Thread Russell Jurney
How can I throttle Spark as it writes to Elasticsearch? I have already
repartitioned down to one partition in an effort to slow the writes. ES
indicates it is being overloaded, and I don't know how to slow things down.
This is all on one r4.xlarge EC2 node that runs Spark with 25GB of RAM and
ES as well.

The script:
https://github.com/rjurney/Agile_Data_Code_2/blob/master/ch04/pyspark_to_elasticsearch.py

The error: https://gist.github.com/rjurney/ec0d6b1ef050e3fbead2314255f4b6fa

I asked the question on the Elasticsearch forums and I thought someone here
might know:
https://discuss.elastic.co/t/spark-elasticsearch-exception-maybe-es-was-overloaded/71932

Thanks!
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Deepak Sharma
On the sqlcontext or hivesqlcontext , you can register the function as udf
below:
*hiveSqlContext.udf.register("func_name",func(_:String))*

Thanks
Deepak

On Wed, Jan 18, 2017 at 8:45 AM, Sirisha Cheruvu  wrote:

> Hey
>
> Can yu send me the source code of hive java udf which worked in spark sql
> and how yu registered the function on spark
>
>
> On Jan 17, 2017 2:01 PM, "Sirisha Cheruvu"  wrote:
>
> Hi
>
> Anybody has a test and tried generic udf with object inspector
> implementaion which sucessfully ran on both hive and spark-sql
>
> please share me the git hub link or source code file
>
> Thanks in advance
> Sirisha
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Sirisha Cheruvu
This error
org.apache.spark.sql.AnalysisException: No handler for Hive udf class
com.nexr.platform.hive.udf.GenericUDFNVL2 because:
com.nexr.platform.hive.udf.GenericUDFNVL2.; line 1 pos 26
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$
lookupFunction$2.apply(hiveUDFs.scala:105)
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$
lookupFunction$2.apply(hiveUDFs.scala:64)
at scala.util.Try.getOrElse(Try.scala:77)





My script is this


import org.apache.spark.sql.hive.HiveContext
val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
hc.sql("add jar /home/cloudera/Downloads/genudnvl2.jar");
hc.sql("create temporary function nexr_nvl2 as 'com.nexr.platform.hive.udf.
GenericUDFNVL2'");
hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
System.exit(0);


On Jan 17, 2017 2:01 PM, "Sirisha Cheruvu"  wrote:

> Hi
>
> Anybody has a test and tried generic udf with object inspector
> implementaion which sucessfully ran on both hive and spark-sql
>
> please share me the git hub link or source code file
>
> Thanks in advance
> Sirisha
>


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Deepak Sharma
Did you tried this with spark-shell?
Please try this.
$spark-shell --jars /home/cloudera/Downloads/genudnvl2.jar

On the spark shell:
val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
hc.sql("create temporary function nexr_nvl2 as '
com.nexr.platform.hive.udf.GenericUDFNVL2'");
hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;

This should work.

So basically in your spark program , you can specify the jars while
submitting the job using spark-submit.

There is no need to have add jars statement in the spark program itself.

Thanks
Deepak


On Wed, Jan 18, 2017 at 8:58 AM, Sirisha Cheruvu  wrote:

> This error
> org.apache.spark.sql.AnalysisException: No handler for Hive udf class
> com.nexr.platform.hive.udf.GenericUDFNVL2 because:
> com.nexr.platform.hive.udf.GenericUDFNVL2.; line 1 pos 26
> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
> upFunction$2.apply(hiveUDFs.scala:105)
> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
> upFunction$2.apply(hiveUDFs.scala:64)
> at scala.util.Try.getOrElse(Try.scala:77)
>
>
>
>
>
> My script is this
>
>
> import org.apache.spark.sql.hive.HiveContext
> val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
> hc.sql("add jar /home/cloudera/Downloads/genudnvl2.jar");
> hc.sql("create temporary function nexr_nvl2 as '
> com.nexr.platform.hive.udf.GenericUDFNVL2'");
> hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
> System.exit(0);
>
>
> On Jan 17, 2017 2:01 PM, "Sirisha Cheruvu"  wrote:
>
>> Hi
>>
>> Anybody has a test and tried generic udf with object inspector
>> implementaion which sucessfully ran on both hive and spark-sql
>>
>> please share me the git hub link or source code file
>>
>> Thanks in advance
>> Sirisha
>>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Sirisha Cheruvu
Brilliant... It Worked ..

i should give --jars
instead of add jar statement

On Jan 18, 2017 9:02 AM, "Deepak Sharma"  wrote:

> Did you tried this with spark-shell?
> Please try this.
> $spark-shell --jars /home/cloudera/Downloads/genudnvl2.jar
>
> On the spark shell:
> val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
> hc.sql("create temporary function nexr_nvl2 as '
> com.nexr.platform.hive.udf.GenericUDFNVL2'");
> hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
>
> This should work.
>
> So basically in your spark program , you can specify the jars while
> submitting the job using spark-submit.
>
> There is no need to have add jars statement in the spark program itself.
>
> Thanks
> Deepak
>
>
> On Wed, Jan 18, 2017 at 8:58 AM, Sirisha Cheruvu 
> wrote:
>
>> This error
>> org.apache.spark.sql.AnalysisException: No handler for Hive udf class
>> com.nexr.platform.hive.udf.GenericUDFNVL2 because:
>> com.nexr.platform.hive.udf.GenericUDFNVL2.; line 1 pos 26
>> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
>> upFunction$2.apply(hiveUDFs.scala:105)
>> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
>> upFunction$2.apply(hiveUDFs.scala:64)
>> at scala.util.Try.getOrElse(Try.scala:77)
>>
>>
>>
>>
>>
>> My script is this
>>
>>
>> import org.apache.spark.sql.hive.HiveContext
>> val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
>> hc.sql("add jar /home/cloudera/Downloads/genudnvl2.jar");
>> hc.sql("create temporary function nexr_nvl2 as '
>> com.nexr.platform.hive.udf.GenericUDFNVL2'");
>> hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
>> System.exit(0);
>>
>>
>> On Jan 17, 2017 2:01 PM, "Sirisha Cheruvu"  wrote:
>>
>>> Hi
>>>
>>> Anybody has a test and tried generic udf with object inspector
>>> implementaion which sucessfully ran on both hive and spark-sql
>>>
>>> please share me the git hub link or source code file
>>>
>>> Thanks in advance
>>> Sirisha
>>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Sirisha Cheruvu
Thank You Deepak

On Jan 18, 2017 9:02 AM, "Deepak Sharma"  wrote:

> Did you tried this with spark-shell?
> Please try this.
> $spark-shell --jars /home/cloudera/Downloads/genudnvl2.jar
>
> On the spark shell:
> val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
> hc.sql("create temporary function nexr_nvl2 as '
> com.nexr.platform.hive.udf.GenericUDFNVL2'");
> hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
>
> This should work.
>
> So basically in your spark program , you can specify the jars while
> submitting the job using spark-submit.
>
> There is no need to have add jars statement in the spark program itself.
>
> Thanks
> Deepak
>
>
> On Wed, Jan 18, 2017 at 8:58 AM, Sirisha Cheruvu 
> wrote:
>
>> This error
>> org.apache.spark.sql.AnalysisException: No handler for Hive udf class
>> com.nexr.platform.hive.udf.GenericUDFNVL2 because:
>> com.nexr.platform.hive.udf.GenericUDFNVL2.; line 1 pos 26
>> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
>> upFunction$2.apply(hiveUDFs.scala:105)
>> at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$look
>> upFunction$2.apply(hiveUDFs.scala:64)
>> at scala.util.Try.getOrElse(Try.scala:77)
>>
>>
>>
>>
>>
>> My script is this
>>
>>
>> import org.apache.spark.sql.hive.HiveContext
>> val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
>> hc.sql("add jar /home/cloudera/Downloads/genudnvl2.jar");
>> hc.sql("create temporary function nexr_nvl2 as '
>> com.nexr.platform.hive.udf.GenericUDFNVL2'");
>> hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
>> System.exit(0);
>>
>>
>> On Jan 17, 2017 2:01 PM, "Sirisha Cheruvu"  wrote:
>>
>>> Hi
>>>
>>> Anybody has a test and tried generic udf with object inspector
>>> implementaion which sucessfully ran on both hive and spark-sql
>>>
>>> please share me the git hub link or source code file
>>>
>>> Thanks in advance
>>> Sirisha
>>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


How to access metrics for Structured Streaming 2.1

2017-01-17 Thread Heji Kim
Hello. We are trying to migrate and performance test the kafka sink for
structured streaming in 2.1. Obviously we miss the beautiful Streaming
Statistics ui tab and we are trying to figure out the most reasonable way
to monitor event processing rates and lag time.

1. Are the SourceStatus and SinkStatus mentioned in the "Metrics in
Structured Streaming" design doc

 available with the json or jmx metrics? I can't seem to find any of the
structured streaming metrics in either sink.

2. RDD Streaming metrics provides total counts such as

totalProcessedRecords, totalReceivedRecords

Is there anything similar in Structured Streaming?

3. Are there any plans for supporting something like the Streaming
Statistics tab for structured streaming in the web UI?  Or if we use
structured streaming, should we be expected to integrate our own  reporting
server like ganglia?

Any help is greatly appreciated.

Thanks,
Heji


Re: Debugging a PythonException with no details

2017-01-17 Thread Nicholas Chammas
Hey Marco,

I stopped seeing this error once I started round-tripping intermediate
DataFrames to disk.

You can read more about what I saw here:
https://github.com/graphframes/graphframes/issues/159

Nick

On Sat, Jan 14, 2017 at 4:02 PM Marco Mistroni  wrote:

> It seems it has to do with UDF..Could u share snippet of code you are
> running?
> Kr
>
> On 14 Jan 2017 1:40 am, "Nicholas Chammas" 
> wrote:
>
> I’m looking for tips on how to debug a PythonException that’s very sparse
> on details. The full exception is below, but the only interesting bits
> appear to be the following lines:
>
> org.apache.spark.api.python.PythonException:
> ...
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> Otherwise, the only other clue from the traceback I can see is that the
> problem may involve a UDF somehow.
>
> I’ve tested this code against many datasets (stored as ORC) and it works
> fine. The same code only seems to throw this error on a few datasets that
> happen to be sourced via JDBC. I can’t seem to get a lead on what might be
> going wrong here.
>
> Does anyone have tips on how to debug a problem like this? How do I find
> more specifically what is going wrong?
>
> Nick
>
> Here’s the full exception:
>
> 17/01/13 17:12:14 WARN TaskSetManager: Lost task 7.0 in stage 9.0 (TID 15, 
> devlx023.private.massmutual.com, executor 4): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 
> 161, in main
> func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 97, 
> in read_udfs
> arg_offsets, udf = read_single_udf(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 78, 
> in read_single_udf
> f, return_type = read_command(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 54, 
> in read_command
> command = serializer._read_with_length(file)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 169, in _read_with_length
> return self.loads(obj)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 431, in loads
> return pickle.loads(obj, encoding=encoding)
>   File 
> "/hadoop/yarn/nm/usercache/jenkins/appcache/application_1483203887152_1207/container_1483203887152_1207_01_05/splinkr/person.py",
>  line 111, in 
> py_normalize_udf = udf(py_normalize, StringType())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1868, in udf
> return UserDefinedFunction(f, returnType)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1826, in __init__
> self._judf = self._create_judf(name)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1830, in _create_judf
> sc = SparkContext.getOrCreate()
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 307, in getOrCreate
> SparkContext(conf=conf or SparkConf())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 118, in __init__
> conf, jsc, profiler_cls)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 179, in _do_init
> self._jsc = jsc or self._initialize_context(self._conf._jconf)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 246, in _initialize_context
> return self._jvm.JavaSparkContext(jconf)
>   File 
> "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 
> 1401, in __call__
> answer, self._gateway_client, None, self._fqn)
>   File "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", 
> line 327, in get_return_value
> format(target_id, ".", name))
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at 

Re: How to access metrics for Structured Streaming 2.1

2017-01-17 Thread Shixiong(Ryan) Zhu
You can use the monitoring APIs of Structured Streaming to get metrics. See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

On Tue, Jan 17, 2017 at 5:01 PM, Heji Kim 
wrote:

> Hello. We are trying to migrate and performance test the kafka sink for
> structured streaming in 2.1. Obviously we miss the beautiful Streaming
> Statistics ui tab and we are trying to figure out the most reasonable way
> to monitor event processing rates and lag time.
>
> 1. Are the SourceStatus and SinkStatus mentioned in the "Metrics in
> Structured Streaming" design doc
> 
>  available with the json or jmx metrics? I can't seem to find any of the
> structured streaming metrics in either sink.
>
> 2. RDD Streaming metrics provides total counts such as
>
> totalProcessedRecords, totalReceivedRecords
>
> Is there anything similar in Structured Streaming?
>
> 3. Are there any plans for supporting something like the Streaming
> Statistics tab for structured streaming in the web UI?  Or if we use
> structured streaming, should we be expected to integrate our own  reporting
> server like ganglia?
>
> Any help is greatly appreciated.
>
> Thanks,
> Heji
>
>
>
>


how to dynamic partition dataframe

2017-01-17 Thread lk_spark
hi,all:
I want partition data by reading a config file who tells me how to 
partition current input data.
DataFrameWriter have a method named with : partitionBy(colNames: String*): 
DataFrameWriter[T]   
why I can't pass parameter format with Seq[String] or Array[String]?

2017-01-18


lk_spark 

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Thanks Yong for the response. Adding my responses inline

On Tue, Jan 17, 2017 at 10:27 PM, Yong Zhang  wrote:

> What DB you are using for your Hive meta store, and what types are your
> partition columns?
>
I am using MySql for Hive metastore. Partition columns are  combination of
INT and STRING types.

>
> You maybe want to read the discussion in SPARK-6910, and especially the
> comments in PR. There are some limitation about partition pruning in
> Hive/Spark, maybe yours is one of them
>
Seems I had already gone through SPARK-6910 and corresponding all PRs.
*spark.sql.hive.convertMetastoreParquet   false*
*spark.sql.hive.metastorePartitionPruning   true*
*I had set the above properties from *SPARK-6910 & PRs.


>

> Yong
>
>
> --
> *From:* Raju Bairishetti 
> *Sent:* Tuesday, January 17, 2017 3:00 AM
> *To:* user @spark
> *Subject:* Re: Spark sql query plan contains all the partitions from hive
> table even though filtering of partitions is provided
>
> Had a high level look into the code. Seems getHiveQlPartitions  method
> from HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning
> conf value.
>
>  It should not fetch all partitions if we set metastorePartitionPruning to
> true (Default value for this is false)
>
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
>
> ...
>
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
>
> lazy val allPartitions = table.getAllPartitions
>
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
>
> Am I missing something or looking at wrong place?
>
>
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
>
>> Waiting for suggestions/help on this...
>>
>> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
>> wrote:
>>
>>> Hello,
>>>
>>>Spark sql is generating query plan with all partitions information
>>> even though if we apply filters on partitions in the query.  Due to this,
>>> spark driver/hive metastore is hitting with OOM as each table is with lots
>>> of partitions.
>>>
>>> We can confirm from hive audit logs that it tries to *fetch all
>>> partitions* from hive metastore.
>>>
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>>> cmd=get_partitions : db= tbl=x
>>>
>>>
>>> Configured the following parameters in the spark conf to fix the above
>>> issue(source: from spark-jira & github pullreq):
>>>
>>> *spark.sql.hive.convertMetastoreParquet   false *
>>> *spark.sql.hive.metastorePartitionPruning   true*
>>>
>>>
>>> *   plan:  rdf.explain *
>>> *   == Physical Plan ==*
>>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>>
>>> *get_partitions_by_filter* method is called and fetching only
>>> required partitions.
>>>
>>> But we are seeing parquetDecode errors in our applications
>>> frequently after this. Looks like these decoding errors were because of
>>> changing serde from spark-builtin to hive serde.
>>>
>>> I feel like,* fixing query plan generation in the spark-sql* is the
>>> right approach instead of forcing users to use hive serde.
>>>
>>> Is there any workaround/way to fix this issue? I would like to hear more
>>> thoughts on this :)
>>>
>>> --
>>> Thanks,
>>> Raju Bairishetti,
>>> www.lazada.com
>>>
>>
>>
>>
>> --
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Please help me out !!!!Getting error while trying to hive java generic udf in spark

2017-01-17 Thread Sirisha Cheruvu
Hi Everyone..


getting below error while running hive java udf from sql context..


org.apache.spark.sql.AnalysisException: No handler for Hive udf class
com.nexr.platform.hive.udf.GenericUDFNVL2 because:
com.nexr.platform.hive.udf.GenericUDFNVL2.; line 1 pos 26
at
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:105)
at
org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$lookupFunction$2.apply(hiveUDFs.scala:64)
at scala.util.Try.getOrElse(Try.scala:77)





My script is this


import org.apache.spark.sql.hive.HiveContext
val hc = new org.apache.spark.sql.hive.HiveContext(sc) ;
hc.sql("add jar /home/cloudera/Downloads/genudnvl2.jar");
hc.sql("create temporary function nexr_nvl2 as
'com.nexr.platform.hive.udf.GenericUDFNVL2'");
hc.sql("select nexr_nvl2(name,let,ret) from testtab5").show;
System.exit(0);




and attached is the java hive udf which i am trying to run on spark

Please hep

Regards,
SIrisha


GenericUDFNVL2.java
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

anyone from bangalore wants to work on spark projects along with me

2017-01-17 Thread Sirisha Cheruvu
Hi ,

Just thought of keeping my intention of working together with spark
developers who are also from bangalore so that we can brainstorm
togetherand work out solutions on our projects?


what say?

expecting a reply


Re: ScalaReflectionException (class not found) error for user class in spark 2.1.0

2017-01-17 Thread Koert Kuipers
and to be clear, this is not in the REPL or with Hive (both well known
situations in which these errors arise)

On Mon, Jan 16, 2017 at 11:51 PM, Koert Kuipers  wrote:

> i am experiencing a ScalaReflectionException exception when doing an
> aggregation on a spark-sql DataFrame. the error looks like this:
>
> Exception in thread "main" scala.ScalaReflectionException: class
>  in JavaMirror with sun.misc.Launcher$AppClassLoader@28d93b30 of
> type class sun.misc.Launcher$AppClassLoader with classpath []
> not found.
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(
> Mirrors.scala:123)
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(
> Mirrors.scala:22)
> at 
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$
> lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at org.apache.spark.sql.SQLImplicits$$typecreator9$1.
> apply(SQLImplicits.scala:127)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$
> lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.
> apply(ExpressionEncoder.scala:49)
> at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(
> SQLImplicits.scala:127)
> at 
>
>
> some things to note:
> *  contains driver-class-path as indicated by me using
> spark-submit, and all the jars that spark added. but it does not contain my
> own assembly jar which contains 
> * the class that is missing is a simple case class that is only used in
> the aggregators on the executors, never driver-side
> * i am running spark 2.1.0 with java 8 on yarn, but i can reproduce the
> same error in local mode
>
> what is this classloader that excludes my jar?
> the error looks somewhat like SPARK-8470, but i am not using hive, and
> spark was not build with hive support.
>
> i can fix the error by adding my assembly jar to driver-classpath, but
> that feels like a hack.
>
> thanks,
> koert
>
>


need a hive generic udf which also works on spark sql

2017-01-17 Thread Sirisha Cheruvu
Hi

Anybody has a test and tried generic udf with object inspector
implementaion which sucessfully ran on both hive and spark-sql

please share me the git hub link or source code file

Thanks in advance
Sirisha


Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Raju Bairishetti
Had a high level look into the code. Seems getHiveQlPartitions  method from
HiveMetastoreCatalog is getting called irrespective of
metastorePartitionPruning
conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to
true (Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?


On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti  wrote:

> Waiting for suggestions/help on this...
>
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
> wrote:
>
>> Hello,
>>
>>Spark sql is generating query plan with all partitions information
>> even though if we apply filters on partitions in the query.  Due to this,
>> spark driver/hive metastore is hitting with OOM as each table is with lots
>> of partitions.
>>
>> We can confirm from hive audit logs that it tries to *fetch all
>> partitions* from hive metastore.
>>
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x
>> cmd=get_partitions : db= tbl=x
>>
>>
>> Configured the following parameters in the spark conf to fix the above
>> issue(source: from spark-jira & github pullreq):
>>
>> *spark.sql.hive.convertMetastoreParquet   false*
>> *spark.sql.hive.metastorePartitionPruning   true*
>>
>>
>> *   plan:  rdf.explain*
>> *   == Physical Plan ==*
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname,
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 =
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>>
>> *get_partitions_by_filter* method is called and fetching only
>> required partitions.
>>
>> But we are seeing parquetDecode errors in our applications frequently
>> after this. Looks like these decoding errors were because of changing
>> serde from spark-builtin to hive serde.
>>
>> I feel like,* fixing query plan generation in the spark-sql* is the
>> right approach instead of forcing users to use hive serde.
>>
>> Is there any workaround/way to fix this issue? I would like to hear more
>> thoughts on this :)
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>
>
> --
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>



-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Re: GroupBy and Spark Performance issue

2017-01-17 Thread Andy Dang
Repartition wouldn't save you from skewed data unfortunately. The way Spark
works now is that it pulls data of the same key to one single partition,
and Spark, AFAIK, retains the mapping from key to data in memory.

You can use aggregateBykey() or combineByKey() or reduceByKey() to avoid
this problem because these functions can be evaluated using map-side
aggregation:
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html


---
Regards,
Andy

On Tue, Jan 17, 2017 at 5:39 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am trying to group by data in spark and find out maximum value for group
> of data. I have to use group by as I need to transpose based on the values.
>
> I tried repartition data by increasing number from 1 to 1.Job gets run
> till the below stage and it takes long time to move ahead. I was never
> successful, job gets killed after somtime with GC overhead limit issues.
>
>
> [image: Inline image 1]
>
> Increased Memory limits too. Not sure what is going wrong, can anyone
> guide me through right approach.
>
> Thanks,
> Asmath
>


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Takeshi Yamamuro
Hi,

AFAIK, you could use Hive GenericUDF stuffs in spark without much effort.
If you'd like to check test suites about that, you'd better to visit
HiveUDFSuite.
https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala

I also have used Hive UDFs (including GenericUDF) in Spark for another
incubator project I belong to (https://github.com/apache/incubator-hivemall),
but I didn't hit critical issues so far.

// maropu


On Tue, Jan 17, 2017 at 5:31 PM, Sirisha Cheruvu  wrote:

> Hi
>
> Anybody has a test and tried generic udf with object inspector
> implementaion which sucessfully ran on both hive and spark-sql
>
> please share me the git hub link or source code file
>
> Thanks in advance
> Sirisha
>



-- 
---
Takeshi Yamamuro


Spark/Parquet/Statistics question

2017-01-17 Thread djiang
Hi, 

I have been looking into how Spark stores statistics (min/max) in Parquet as
well as how it uses the info for query optimization.
I have got a few questions.
First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
with a long type and a string type column.
They are sorted by different columns, though.

scala> spark.sql("select id, cast(id as string) text from
range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
scala> spark.sql("select id, cast(id as string) text from
range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")

I added some code to parquet-tools to print out stats and examine the
generated parquet files:

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
file:   
file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
creator: parquet-mr version 1.8.1 (build
4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 

file schema: spark_schema 

id:  REQUIRED INT64 R:0 D:0
text:REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:133 OFFSET:4 

id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
ENC:PLAIN,BIT_PACKED

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
file:   
file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
creator: parquet-mr version 1.8.1 (build
4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 

file schema: spark_schema 

id:  REQUIRED INT64 R:0 D:0
text:REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:140 OFFSET:4 

id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
ENC:PLAIN,BIT_PACKED

So the question is why is Spark, particularly, 2.1.0, only generate min/max
for numeric columns, but not strings(BINARY) fields, even if the string
field is included in the sort? Maybe I missed a configuraiton?

The second issue, is how can I confirm Spark is utilizing the min/max?
scala> sc.setLogLevel("INFO")
scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
id=4").show
I got many lines like this:
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
file:///secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
range: 0-558, partition values: [empty row]
...
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
range: 0-574, partition values: [empty row]
...

The question is it looks like Spark is scanning every file, even if from the
min/max, Spark should be able to determine only part-0 has the relevant
data. Or maybe I read it wrong, that Spark is skipping the files? Maybe
Spark can only use partition value for data skipping?

Thanks,

Dong




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Parquet-Statistics-question-tp28312.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Yong Zhang
What DB you are using for your Hive meta store, and what types are your 
partition columns?


You maybe want to read the discussion in SPARK-6910, and especially the 
comments in PR. There are some limitation about partition pruning in 
Hive/Spark, maybe yours is one of them.


Yong



From: Raju Bairishetti 
Sent: Tuesday, January 17, 2017 3:00 AM
To: user @spark
Subject: Re: Spark sql query plan contains all the partitions from hive table 
even though filtering of partitions is provided

Had a high level look into the code. Seems getHiveQlPartitions  method from 
HiveMetastoreCatalog is getting called irrespective of 
metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true 
(Default value for this is false)

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
  val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
  } else {
allPartitions
  }

...

def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
  client.getPartitionsByFilter(this, predicates)

lazy val allPartitions = table.getAllPartitions

But somehow getAllPartitions is getting called eventough after setting 
metastorePartitionPruning to true.

Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti 
> wrote:
Waiting for suggestions/help on this...

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti 
> wrote:
Hello,

   Spark sql is generating query plan with all partitions information even 
though if we apply filters on partitions in the query.  Due to this, spark 
driver/hive metastore is hitting with OOM as each table is with lots of 
partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from 
hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
(HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
cmd=get_partitions : db= tbl=x


Configured the following parameters in the spark conf to fix the above 
issue(source: from spark-jira & github pullreq):
spark.sql.hive.convertMetastoreParquet   false
spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
   HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 
= 2),(venture#318 = DEFAULT)]

get_partitions_by_filter method is called and fetching only required 
partitions.

But we are seeing parquetDecode errors in our applications frequently after 
this. Looks like these decoding errors were because of changing serde from 
spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right 
approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more 
thoughts on this :)

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com



--

--
Thanks,
Raju Bairishetti,
www.lazada.com


Spark ANSI SQL Support

2017-01-17 Thread Rishabh Bhardwaj
Hi All,

Does Spark 2.0 Sql support full ANSI SQL query standards?

Thanks,
Rishabh.


Re: Spark ANSI SQL Support

2017-01-17 Thread Deepak Sharma
>From spark documentation page:
Spark SQL can now run all 99 TPC-DS queries.

On Jan 18, 2017 9:39 AM, "Rishabh Bhardwaj"  wrote:

> Hi All,
>
> Does Spark 2.0 Sql support full ANSI SQL query standards?
>
> Thanks,
> Rishabh.
>


Re: Spark / Elasticsearch Error: Maybe ES was overloaded? How to throttle down Spark as it writes to ES

2017-01-17 Thread Koert Kuipers
in our experience you can't really.
there are some settings to make spark wait longer before retrying when es
is overloaded, but i have never found them too much use.

check out these settings, maybe they are of some help:
es.batch.size.bytes
es.batch.size.entries
es.http.timeout
es.batch.write.retry.count
es.batch.write.retry.wait


On Tue, Jan 17, 2017 at 10:13 PM, Russell Jurney 
wrote:

> How can I throttle Spark as it writes to Elasticsearch? I have already
> repartitioned down to one partition in an effort to slow the writes. ES
> indicates it is being overloaded, and I don't know how to slow things down.
> This is all on one r4.xlarge EC2 node that runs Spark with 25GB of RAM and
> ES as well.
>
> The script: https://github.com/rjurney/Agile_Data_Code_2/
> blob/master/ch04/pyspark_to_elasticsearch.py
>
> The error: https://gist.github.com/rjurney/ec0d6b1ef050e3fbead2314255f4b6
> fa
>
> I asked the question on the Elasticsearch forums and I thought someone
> here might know: https://discuss.elastic.co/t/
> spark-elasticsearch-exception-maybe-es-was-overloaded/71932
>
> Thanks!
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>