Re: LIMIT issue of SparkSQL

2016-10-23 Thread Michael Armbrust
- dev + user

Can you give more info about the query?  Maybe a full explain()?  Are you
using a datasource like JDBC?  The API does not currently push down limits,
but the documentation talks about how you can use a query instead of a
table if that is what you are looking to do.

On Mon, Oct 24, 2016 at 5:40 AM, Liz Bai  wrote:

> Hi all,
>
> Let me clarify the problem:
>
> Suppose we have a simple table `A` with 100 000 000 records
>
> Problem:
> When we execute sql query ‘select * from A Limit 500`,
> It scan through all 100 000 000 records.
> Normal behaviour should be that once 500 records is found, engine stop
> scanning.
>
> Detailed observation:
> We found that there are “GlobalLimit / LocalLimit” physical operators
> https://github.com/apache/spark/blob/branch-2.0/sql/
> core/src/main/scala/org/apache/spark/sql/execution/limit.scala
> But during query plan generation, GlobalLimit / LocalLimit is not applied
> to the query plan.
>
> Could you please help us to inspect LIMIT problem?
> Thanks.
>
> Best,
> Liz
>
> On 23 Oct 2016, at 10:11 PM, Xiao Li  wrote:
>
> Hi, Liz,
>
> CollectLimit means `Take the first `limit` elements and collect them to a
> single partition.`
>
> Thanks,
>
> Xiao
>
> 2016-10-23 5:21 GMT-07:00 Ran Bai :
>
>> Hi all,
>>
>> I found the runtime for query with or without “LIMIT” keyword is the
>> same. We looked into it and found actually there is “GlobalLimit /
>> LocalLimit” in logical plan, however no relevant physical plan there. Is
>> this a bug or something else? Attached are the logical and physical plans
>> when running "SELECT * FROM seq LIMIT 1".
>>
>>
>> More specifically, We expected a early stop upon getting adequate results.
>> Thanks so much.
>>
>> Best,
>> Liz
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
>


How to avoid the delay associated with Hive Metastore when loading parquet?

2016-10-23 Thread ankits
Hi,

I'm loading parquet files via spark, and I see the first time a file is
loaded that there is a 5-10s delay related to the Hive Metastore with
messages relating to metastore in the console.  How can I avoid this delay
and keep the metadata around? I want the data to be persisted even after
killing the JVM/sparksession and avoid this delay.

I have configured hive-site to use a MySQL DB as the metastore - i thought
that would solve the problem by giving it a persistent metastore, but that
did not help, so I don't quite understand whats going on. How do i keep the
metadata around and avoid the delay? 


Here is the relevant code and config

*Initializing the SparkSession, storing and reading data via parquet* 


*hive-site.xml*


*Console output*










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-the-delay-associated-with-Hive-Metastore-when-loading-parquet-tp27948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: RDD groupBy() then random sort each group ?

2016-10-23 Thread Yang
thanks, this direction seems to be inline with what I want.

what i really want is
groupBy() and then for the rows in each group, get an Iterator, and run
each element from the iterator through a local function (specifically SGD),
right now the DataSet API provides this , but it's literally an Iterator so
I can't "reset" the Iterator, but SGD does need the ability to run multiple
passes on the iterator



On Sat, Oct 22, 2016 at 1:22 PM, Koert Kuipers  wrote:

> groupBy always materializes the entire group (on disk or in memory) which
> is why you should avoid it for large groups.
>
> The key is to never materialize the grouped and shuffled data.
>
> To see one approach to do this take a look at
> https://github.com/tresata/spark-sorted
>
> It's basically a combination of smart partitioning and secondary sort.
>
> On Oct 20, 2016 1:55 PM, "Yang"  wrote:
>
>> in my application, I group by same training samples by their model_id's
>>  (the input table contains training samples for 100k different models),
>> then each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>> val (model_id, group_of_rows) = x
>>
>>  (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled  but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>


Re: RDD groupBy() then random sort each group ?

2016-10-23 Thread Yang
thanks.

exactly this is what I ended up doing finally. though it seemed to work,
there seems to be guarantee that the randomness after the
sortWithinPartitions() would be preserved after I do a further groupBy.



On Fri, Oct 21, 2016 at 3:55 PM, Cheng Lian  wrote:

> I think it would much easier to use DataFrame API to do this by doing
> local sort using randn() as key. For example, in Spark 2.0:
>
> val df = spark.range(100)
> val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42))
>
> Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the key
> to group by, then you can get the RDD from shuffled and do the following
> operations you want.
>
> Cheng
>
>
>
> On 10/20/16 10:53 AM, Yang wrote:
>
>> in my application, I group by same training samples by their model_id's
>> (the input table contains training samples for 100k different models), then
>> each group ends up having about 1 million training samples,
>>
>> then I feed that group of samples to a little Logistic Regression solver
>> (SGD), but SGD requires the input data to be shuffled randomly (so that
>> positive and negative samples are evenly distributed), so now I do
>> something like
>>
>> my_input_rdd.groupBy(x=>x.model_id).map(x=>
>> val (model_id, group_of_rows) = x
>>
>>  (model_id, group_of_rows.toSeq().shuffle() )
>>
>> ).map(x=> (x._1, train_sgd(x._2))
>>
>>
>> the issue is that on the 3rd row above, I had to explicitly call toSeq()
>> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
>> now I have to load the entire 1 million rows into memory, and in practice
>> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
>> time). I suspect this toSeq() is the reason, since doing a simple count()
>> on the groupBy() result works fine.
>>
>> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
>> not do the toSeq().shuffle(). intuitively the input rdd is already
>> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
>> group SHOULD remain shuffled  but overall this remains rather flimsy.
>>
>> any ideas to do this more reliably?
>>
>> thanks!
>>
>>
>


Re: PostgresSql queries vs spark sql

2016-10-23 Thread Selvam Raman
I found it. We can use pivot which is similar to cross tab
In postgres.

Thank you.
On Oct 17, 2016 10:00 PM, "Selvam Raman"  wrote:

> Hi,
>
> Please share me some idea if you work on this earlier.
> How can i develop postgres CROSSTAB function in spark.
>
> Postgres Example
>
> Example 1:
>
> SELECT mthreport.*
>   FROM
>   *crosstab*('SELECT i.item_name::text As row_name, 
> to_char(if.action_date, ''mon'')::text As bucket,
>   SUM(if.num_used)::integer As bucketvalue
>   FROM inventory As i INNER JOIN inventory_flow As if
>   ON i.item_id = if.item_id
> AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 
> 23:59''
>   GROUP BY i.item_name, to_char(if.action_date, ''mon''), 
> date_part(''month'', if.action_date)
>   ORDER BY i.item_name',
>   'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval, 
> ''mon'') As short_mname
>   FROM generate_series(0,11) n')
>   As mthreport(item_name text, jan integer, feb integer, mar 
> integer,
>   apr integer, may integer, jun integer, jul integer,
>   aug integer, sep integer, oct integer, nov integer,
>   dec integer)
>
> The output of the above crosstab looks as follows:
> [image: crosstab source_sql cat_sql example]
>
> Example 2:
>
> CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT);
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8');
>
> SELECT *
> FROM crosstab(
>   'select rowid, attribute, value
>from ct
>where attribute = ''att2'' or attribute = ''att3''
>order by 1,2')
> AS ct(row_name text, category_1 text, category_2 text, category_3 text);
>
>  row_name | category_1 | category_2 | category_3
> --+++
>  test1| val2   | val3   |
>  test2| val6   | val7   |
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Spark submit running spark-sql-perf and additional jar

2016-10-23 Thread Mr rty ff
Hi
I run the following script

home/spark-2.0.1-bin-hadoop2.7/bin/spark-submit --conf  "someconf" "--jars 
/home/user/workspace/auxdriver/target/auxdriver.jar,/media/sf_VboxShared/tpc-ds/spark-sql-perf-v.0.2.4/spark-sql-perf-assembly-0.2.4.jar
 --benchmark DatabasePerformance --iterations 1 --sparkMaster somemaster 
--location "location" --scaleFactor 50 --appName "nvm"
I get the following error
Error: Cannot load main class from JAR 
file:/home/irina/workspace/stocator-s3/target/stocator-s3-1.0-jar-with-dependencies.jar,/media/sf_VboxShared/tpc-ds/spark-sql-perf-v.0.2.4/spark-sql-perf-assembly-0.2.4.jar
If I change the order I get
Error: Unknown argument '/home/user/workspace/auxdriver/target/auxdriver.jar'

If I run without auxdriver it works ok but I need it to run it with it to test 
the driver.

What is the proper way to run spark submit with two jars while one with 
parameters

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Random forest classifier error : Size exceeds Integer.MAX_VALUE

2016-10-23 Thread Kürşat Kurt
Hi;



I am trying to train Random forest classifier.

I have predefined classification set (classifications.csv , ~300.000 line)

While fitting, i am getting "Size exceeds Integer.MAX_VALUE" error.

 

 

Here is the code:

 

object Test1 {

 

  var savePath = "c:/Temp/SparkModel/"

  var stemmer = Resha.Instance

 

  var STOP_WORDS: Set[String] = Set()

 

  def cropSentence(s: String) = {

s.replaceAll("\\([^\\)]*\\)", "")

  .replaceAll(" - ", " ")

  .replaceAll("-", " ")

  .replaceAll("  +", " ")

  .replaceAll(",", " ").trim()

  }

 

  def main(args: Array[String]): Unit = {

 

val sc = new SparkConf().setAppName("Test").setMaster("local[*]")

  .set("spark.sql.warehouse.dir", "D:/Temp/wh")

  .set("spark.executor.memory", "12g")

  .set("spark.driver.memory", "4g")

  .set("spark.hadoop.validateOutputSpecs", "false")

 

val spark = SparkSession.builder.appName("Java
Spark").config(sc).getOrCreate()

import spark.implicits._

 

val mainDataset =
spark.sparkContext.textFile("file:///C:/Temp/classifications.csv")

  .map( _.split(";"))

  .map(tokens => {  

 var list=new ListBuffer[String]()

  var
token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));


  token0.split("\\s+").map {list+=stemmer.stem(_)}   

  (tokens(1), tokens(0),list.toList.mkString(" "))

  }).toDF("className","productName")

 

 

val classIndexer = new StringIndexer()

  .setInputCol("className")

  .setOutputCol("label")

 

val classIndexerModel = classIndexer.fit(mainDataset)

var mainDS=classIndexerModel.transform(mainDataset)

classIndexerModel.write.overwrite.save(savePath + "ClassIndexer")

 

//Tokenizer

  val tokenizer = new Tokenizer()


   .setInputCol("productName") 

   .setOutputCol("words_nonfiltered")

//StopWords

  val remover = new StopWordsRemover()

 .setInputCol("words_nonfiltered")

 .setOutputCol("words")

 .setStopWords(
Array[String]("stop1","stop2","stop3"))

//CountVectorize

 

  val countVectorizer = new CountVectorizer()

 .setInputCol("words")

 .setOutputCol("features")

 

  val  rfc = new RandomForestClassifier ()


  .setLabelCol("label")

  .setNumTrees(50)

  .setMaxDepth(15)

  .setFeatureSubsetStrategy("auto")

  .setFeaturesCol("features")

  .setImpurity("gini")

  .setMaxBins(32)

 

 

   val pipeline = new
Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc))

   val train =mainDS

   val model = pipeline.fit(train) <= EXCEPTION

   model.write.overwrite.save(savePath+"RandomForestClassifier")

 

  }

}

 

 

16/10/23 19:10:37 INFO scheduler.DAGScheduler: Job 8 failed: collectAsMap at
RandomForest.scala:550, took 848.552917 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure:
Lost task 1.0 in stage 13.0 (TID 36, localhost):
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)

at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala
:103)

at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala
:91)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)

at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)

at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:674
)

at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

at java.lang.Thread.run(Thread.java:745)

 

Driver stacktrace:

 

Spark streaming crashes with high throughput

2016-10-23 Thread Jeyhun Karimov
Hi,

I am getting


*Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages.*

error with spark streaming job. I am using spark 2.0.0. The job is simple
windowed aggregation and the stream is read from socket. Average throughput
is 220K tuples p/s. The job is running for a while (approx. 10 mins) as
expected, then I get the message above.
There is similar thread in mailing list but no conclusion is reached there.

Thanks
Jeyhun
-- 
-Cheers

Jeyhun


Re: HashingTF for TF.IDF computation

2016-10-23 Thread Ciumac Sergiu
Thanks Yanbo!

On Sun, Oct 23, 2016 at 1:57 PM, Yanbo Liang  wrote:

> HashingTF was not designed to handle your case, you can try
> CountVectorizer who will keep the original terms as vocabulary for
> retrieving. CountVectorizer will compute a global term-to-index map,
> which can be expensive for a large corpus and has the risk of OOM. IDF
> can accept feature vectors generated by HashingTF or CountVectorizer.
> FYI http://spark.apache.org/docs/latest/ml-features.html#tf-idf
>
> Thanks
> Yanbo
>
> On Thu, Oct 20, 2016 at 10:00 AM, Ciumac Sergiu 
> wrote:
>
>> Hello everyone,
>>
>> I'm having a usage issue with HashingTF class from Spark MLLIB.
>>
>> I'm computing TF.IDF on a set of terms/documents which later I'm using to
>> identify most important ones in each of the input document.
>>
>> Below is a short code snippet which outlines the example (2 documents
>> with 2 words each, executed on Spark 2.0).
>>
>> val documentsToEvaluate = sc.parallelize(Array(Seq("Mars", 
>> "Jupiter"),Seq("Venus", "Mars")))
>> val hashingTF = new HashingTF()
>> val tf = hashingTF.transform(documentsToEvaluate)
>> tf.cache()
>> val idf = new IDF().fit(tf)
>> val tfidf: RDD[Vector] = idf.transform(tf)
>> documentsToEvaluate.zip(tfidf).saveAsTextFile("/tmp/tfidf")
>>
>> The computation yields to the following result:
>>
>> (List(Mars, Jupiter),(1048576,[593437,962819],[0.4054651081081644,0.0]))
>> (List(Venus, Mars),(1048576,[798918,962819],[0.4054651081081644,0.0]))
>>
>> My concern is that I can't get a mapping of TF.IDF weights an initial
>> terms used (i.e. Mars : 0.0, Jupiter : 0.4, Venus : 0.4. You may notice
>> that the weight and terms indices do not correspond after zipping 2
>> sequences). I can only identify the hash (i.e. 593437 : 0.4) mappings.
>>
>> I know HashingTF uses the hashing trick to compute TF. My question is it
>> possible to retrieve terms / weights mapping, or HashingTF was not designed
>> to handle this use-case. If latter, what other implementation of TF.IDF you
>> may recommend.
>>
>> I may continue the computation with the (*hash:weight*) tuple, though
>> getting initial (*term:weight)* would result in a lot easier debugging
>> steps later down the pipeline.
>>
>> Any response will be greatly appreciated!
>>
>> Regards, Sergiu Ciumac
>>
>
>


Dataflow of Spark/Hadoop in steps

2016-10-23 Thread Or Raz
 I would like to know if I have 100 GB data and I would like to find the most
common world ,actually what is going on in my cluster(lets say a master node
and 6 workers) step by step.(1)
what does the master(2)? start the mapreduce job, monitor the traffic and
return the result? the same goes for what the mappers and reducers (can they
be a different node/worker?) do(3)?.
The reducers always wait for all the mappers to finish before they start?(4)
and who combine/attach the final output?

For example:

These is the input for the reducers ,these tuples(I go for an easy example
that each word is unique to every reducer, which means the shuffle step has
been done correctly)

reducer 1: {dog,1} ,{banana,1},{oreo,6} 
reducer 2: {peach,1} ,{mesut,5},{ozil,10} 
reducer 3: {I,4} ,{witch,2} 
reducer 4: {fear,1} ,{goal,6},{arsenal,3}

The output of each reducer should be :

reducer 1: {oreo,6} 
reducer 2: {ozil,10} 
reducer 3: {I,4} 
reducer 4: {goal,6}

Now we need to combine the results, to who we send the output and he will do
sort and aggregate(the master?) (5)? and in those steps and before where
there are I/O calls?(6) (when the data is stored on local disk and when on
HDFS).

In addition in Hadoop as far as I know we need to deploy the Map and Reduce
functions to the matching workers, can we change the functions in run
time?(7) , if we have done a mapreduce job and we want to that again(go over
our results) do we have to split the data again and send it to each
worker?(8)

P.S. I have numbered the questions for understanding where are my questions.

Any more comments or notions would be appreciated :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataflow-of-Spark-Hadoop-in-steps-tp27946.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HashingTF for TF.IDF computation

2016-10-23 Thread Yanbo Liang
HashingTF was not designed to handle your case, you can try CountVectorizer
who will keep the original terms as vocabulary for retrieving.
CountVectorizer will compute a global term-to-index map, which can be
expensive for a large corpus and has the risk of OOM. IDF can accept
feature vectors generated by HashingTF or CountVectorizer.
FYI http://spark.apache.org/docs/latest/ml-features.html#tf-idf

Thanks
Yanbo

On Thu, Oct 20, 2016 at 10:00 AM, Ciumac Sergiu 
wrote:

> Hello everyone,
>
> I'm having a usage issue with HashingTF class from Spark MLLIB.
>
> I'm computing TF.IDF on a set of terms/documents which later I'm using to
> identify most important ones in each of the input document.
>
> Below is a short code snippet which outlines the example (2 documents with
> 2 words each, executed on Spark 2.0).
>
> val documentsToEvaluate = sc.parallelize(Array(Seq("Mars", 
> "Jupiter"),Seq("Venus", "Mars")))
> val hashingTF = new HashingTF()
> val tf = hashingTF.transform(documentsToEvaluate)
> tf.cache()
> val idf = new IDF().fit(tf)
> val tfidf: RDD[Vector] = idf.transform(tf)
> documentsToEvaluate.zip(tfidf).saveAsTextFile("/tmp/tfidf")
>
> The computation yields to the following result:
>
> (List(Mars, Jupiter),(1048576,[593437,962819],[0.4054651081081644,0.0]))
> (List(Venus, Mars),(1048576,[798918,962819],[0.4054651081081644,0.0]))
>
> My concern is that I can't get a mapping of TF.IDF weights an initial
> terms used (i.e. Mars : 0.0, Jupiter : 0.4, Venus : 0.4. You may notice
> that the weight and terms indices do not correspond after zipping 2
> sequences). I can only identify the hash (i.e. 593437 : 0.4) mappings.
>
> I know HashingTF uses the hashing trick to compute TF. My question is it
> possible to retrieve terms / weights mapping, or HashingTF was not designed
> to handle this use-case. If latter, what other implementation of TF.IDF you
> may recommend.
>
> I may continue the computation with the (*hash:weight*) tuple, though
> getting initial (*term:weight)* would result in a lot easier debugging
> steps later down the pipeline.
>
> Any response will be greatly appreciated!
>
> Regards, Sergiu Ciumac
>