Re: LIMIT issue of SparkSQL
- dev + user Can you give more info about the query? Maybe a full explain()? Are you using a datasource like JDBC? The API does not currently push down limits, but the documentation talks about how you can use a query instead of a table if that is what you are looking to do. On Mon, Oct 24, 2016 at 5:40 AM, Liz Baiwrote: > Hi all, > > Let me clarify the problem: > > Suppose we have a simple table `A` with 100 000 000 records > > Problem: > When we execute sql query ‘select * from A Limit 500`, > It scan through all 100 000 000 records. > Normal behaviour should be that once 500 records is found, engine stop > scanning. > > Detailed observation: > We found that there are “GlobalLimit / LocalLimit” physical operators > https://github.com/apache/spark/blob/branch-2.0/sql/ > core/src/main/scala/org/apache/spark/sql/execution/limit.scala > But during query plan generation, GlobalLimit / LocalLimit is not applied > to the query plan. > > Could you please help us to inspect LIMIT problem? > Thanks. > > Best, > Liz > > On 23 Oct 2016, at 10:11 PM, Xiao Li wrote: > > Hi, Liz, > > CollectLimit means `Take the first `limit` elements and collect them to a > single partition.` > > Thanks, > > Xiao > > 2016-10-23 5:21 GMT-07:00 Ran Bai : > >> Hi all, >> >> I found the runtime for query with or without “LIMIT” keyword is the >> same. We looked into it and found actually there is “GlobalLimit / >> LocalLimit” in logical plan, however no relevant physical plan there. Is >> this a bug or something else? Attached are the logical and physical plans >> when running "SELECT * FROM seq LIMIT 1". >> >> >> More specifically, We expected a early stop upon getting adequate results. >> Thanks so much. >> >> Best, >> Liz >> >> >> >> >> - >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >> > > >
How to avoid the delay associated with Hive Metastore when loading parquet?
Hi, I'm loading parquet files via spark, and I see the first time a file is loaded that there is a 5-10s delay related to the Hive Metastore with messages relating to metastore in the console. How can I avoid this delay and keep the metadata around? I want the data to be persisted even after killing the JVM/sparksession and avoid this delay. I have configured hive-site to use a MySQL DB as the metastore - i thought that would solve the problem by giving it a persistent metastore, but that did not help, so I don't quite understand whats going on. How do i keep the metadata around and avoid the delay? Here is the relevant code and config *Initializing the SparkSession, storing and reading data via parquet* *hive-site.xml* *Console output* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-the-delay-associated-with-Hive-Metastore-when-loading-parquet-tp27948.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: RDD groupBy() then random sort each group ?
thanks, this direction seems to be inline with what I want. what i really want is groupBy() and then for the rows in each group, get an Iterator, and run each element from the iterator through a local function (specifically SGD), right now the DataSet API provides this , but it's literally an Iterator so I can't "reset" the Iterator, but SGD does need the ability to run multiple passes on the iterator On Sat, Oct 22, 2016 at 1:22 PM, Koert Kuiperswrote: > groupBy always materializes the entire group (on disk or in memory) which > is why you should avoid it for large groups. > > The key is to never materialize the grouped and shuffled data. > > To see one approach to do this take a look at > https://github.com/tresata/spark-sorted > > It's basically a combination of smart partitioning and secondary sort. > > On Oct 20, 2016 1:55 PM, "Yang" wrote: > >> in my application, I group by same training samples by their model_id's >> (the input table contains training samples for 100k different models), >> then each group ends up having about 1 million training samples, >> >> then I feed that group of samples to a little Logistic Regression solver >> (SGD), but SGD requires the input data to be shuffled randomly (so that >> positive and negative samples are evenly distributed), so now I do >> something like >> >> my_input_rdd.groupBy(x=>x.model_id).map(x=> >> val (model_id, group_of_rows) = x >> >> (model_id, group_of_rows.toSeq().shuffle() ) >> >> ).map(x=> (x._1, train_sgd(x._2)) >> >> >> the issue is that on the 3rd row above, I had to explicitly call toSeq() >> on the group_of_rows in order to shuffle, which is an Iterable and not Seq. >> now I have to load the entire 1 million rows into memory, and in practice >> I've seen my tasks OOM and GC time goes crazy (about 50% of total run >> time). I suspect this toSeq() is the reason, since doing a simple count() >> on the groupBy() result works fine. >> >> I am planning to shuffle the my_input_rdd first, and then groupBy(), and >> not do the toSeq().shuffle(). intuitively the input rdd is already >> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the >> group SHOULD remain shuffled but overall this remains rather flimsy. >> >> any ideas to do this more reliably? >> >> thanks! >> >>
Re: RDD groupBy() then random sort each group ?
thanks. exactly this is what I ended up doing finally. though it seemed to work, there seems to be guarantee that the randomness after the sortWithinPartitions() would be preserved after I do a further groupBy. On Fri, Oct 21, 2016 at 3:55 PM, Cheng Lianwrote: > I think it would much easier to use DataFrame API to do this by doing > local sort using randn() as key. For example, in Spark 2.0: > > val df = spark.range(100) > val shuffled = df.repartition($"id" % 10).sortWithinPartitions(randn(42)) > > Replace df with a DataFrame wrapping your RDD, and $"id" % 10 with the key > to group by, then you can get the RDD from shuffled and do the following > operations you want. > > Cheng > > > > On 10/20/16 10:53 AM, Yang wrote: > >> in my application, I group by same training samples by their model_id's >> (the input table contains training samples for 100k different models), then >> each group ends up having about 1 million training samples, >> >> then I feed that group of samples to a little Logistic Regression solver >> (SGD), but SGD requires the input data to be shuffled randomly (so that >> positive and negative samples are evenly distributed), so now I do >> something like >> >> my_input_rdd.groupBy(x=>x.model_id).map(x=> >> val (model_id, group_of_rows) = x >> >> (model_id, group_of_rows.toSeq().shuffle() ) >> >> ).map(x=> (x._1, train_sgd(x._2)) >> >> >> the issue is that on the 3rd row above, I had to explicitly call toSeq() >> on the group_of_rows in order to shuffle, which is an Iterable and not Seq. >> now I have to load the entire 1 million rows into memory, and in practice >> I've seen my tasks OOM and GC time goes crazy (about 50% of total run >> time). I suspect this toSeq() is the reason, since doing a simple count() >> on the groupBy() result works fine. >> >> I am planning to shuffle the my_input_rdd first, and then groupBy(), and >> not do the toSeq().shuffle(). intuitively the input rdd is already >> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the >> group SHOULD remain shuffled but overall this remains rather flimsy. >> >> any ideas to do this more reliably? >> >> thanks! >> >> >
Re: PostgresSql queries vs spark sql
I found it. We can use pivot which is similar to cross tab In postgres. Thank you. On Oct 17, 2016 10:00 PM, "Selvam Raman"wrote: > Hi, > > Please share me some idea if you work on this earlier. > How can i develop postgres CROSSTAB function in spark. > > Postgres Example > > Example 1: > > SELECT mthreport.* > FROM > *crosstab*('SELECT i.item_name::text As row_name, > to_char(if.action_date, ''mon'')::text As bucket, > SUM(if.num_used)::integer As bucketvalue > FROM inventory As i INNER JOIN inventory_flow As if > ON i.item_id = if.item_id > AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 > 23:59'' > GROUP BY i.item_name, to_char(if.action_date, ''mon''), > date_part(''month'', if.action_date) > ORDER BY i.item_name', > 'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval, > ''mon'') As short_mname > FROM generate_series(0,11) n') > As mthreport(item_name text, jan integer, feb integer, mar > integer, > apr integer, may integer, jun integer, jul integer, > aug integer, sep integer, oct integer, nov integer, > dec integer) > > The output of the above crosstab looks as follows: > [image: crosstab source_sql cat_sql example] > > Example 2: > > CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8'); > > SELECT * > FROM crosstab( > 'select rowid, attribute, value >from ct >where attribute = ''att2'' or attribute = ''att3'' >order by 1,2') > AS ct(row_name text, category_1 text, category_2 text, category_3 text); > > row_name | category_1 | category_2 | category_3 > --+++ > test1| val2 | val3 | > test2| val6 | val7 | > > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >
Spark submit running spark-sql-perf and additional jar
Hi I run the following script home/spark-2.0.1-bin-hadoop2.7/bin/spark-submit --conf "someconf" "--jars /home/user/workspace/auxdriver/target/auxdriver.jar,/media/sf_VboxShared/tpc-ds/spark-sql-perf-v.0.2.4/spark-sql-perf-assembly-0.2.4.jar --benchmark DatabasePerformance --iterations 1 --sparkMaster somemaster --location "location" --scaleFactor 50 --appName "nvm" I get the following error Error: Cannot load main class from JAR file:/home/irina/workspace/stocator-s3/target/stocator-s3-1.0-jar-with-dependencies.jar,/media/sf_VboxShared/tpc-ds/spark-sql-perf-v.0.2.4/spark-sql-perf-assembly-0.2.4.jar If I change the order I get Error: Unknown argument '/home/user/workspace/auxdriver/target/auxdriver.jar' If I run without auxdriver it works ok but I need it to run it with it to test the driver. What is the proper way to run spark submit with two jars while one with parameters - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Random forest classifier error : Size exceeds Integer.MAX_VALUE
Hi; I am trying to train Random forest classifier. I have predefined classification set (classifications.csv , ~300.000 line) While fitting, i am getting "Size exceeds Integer.MAX_VALUE" error. Here is the code: object Test1 { var savePath = "c:/Temp/SparkModel/" var stemmer = Resha.Instance var STOP_WORDS: Set[String] = Set() def cropSentence(s: String) = { s.replaceAll("\\([^\\)]*\\)", "") .replaceAll(" - ", " ") .replaceAll("-", " ") .replaceAll(" +", " ") .replaceAll(",", " ").trim() } def main(args: Array[String]): Unit = { val sc = new SparkConf().setAppName("Test").setMaster("local[*]") .set("spark.sql.warehouse.dir", "D:/Temp/wh") .set("spark.executor.memory", "12g") .set("spark.driver.memory", "4g") .set("spark.hadoop.validateOutputSpecs", "false") val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate() import spark.implicits._ val mainDataset = spark.sparkContext.textFile("file:///C:/Temp/classifications.csv") .map( _.split(";")) .map(tokens => { var list=new ListBuffer[String]() var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr"))); token0.split("\\s+").map {list+=stemmer.stem(_)} (tokens(1), tokens(0),list.toList.mkString(" ")) }).toDF("className","productName") val classIndexer = new StringIndexer() .setInputCol("className") .setOutputCol("label") val classIndexerModel = classIndexer.fit(mainDataset) var mainDS=classIndexerModel.transform(mainDataset) classIndexerModel.write.overwrite.save(savePath + "ClassIndexer") //Tokenizer val tokenizer = new Tokenizer() .setInputCol("productName") .setOutputCol("words_nonfiltered") //StopWords val remover = new StopWordsRemover() .setInputCol("words_nonfiltered") .setOutputCol("words") .setStopWords( Array[String]("stop1","stop2","stop3")) //CountVectorize val countVectorizer = new CountVectorizer() .setInputCol("words") .setOutputCol("features") val rfc = new RandomForestClassifier () .setLabelCol("label") .setNumTrees(50) .setMaxDepth(15) .setFeatureSubsetStrategy("auto") .setFeaturesCol("features") .setImpurity("gini") .setMaxBins(32) val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc)) val train =mainDS val model = pipeline.fit(train) <= EXCEPTION model.write.overwrite.save(savePath+"RandomForestClassifier") } } 16/10/23 19:10:37 INFO scheduler.DAGScheduler: Job 8 failed: collectAsMap at RandomForest.scala:550, took 848.552917 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure: Lost task 1.0 in stage 13.0 (TID 36, localhost): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala :103) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala :91) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:674 ) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11 42) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6 17) at java.lang.Thread.run(Thread.java:745) Driver stacktrace:
Spark streaming crashes with high throughput
Hi, I am getting *Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.* error with spark streaming job. I am using spark 2.0.0. The job is simple windowed aggregation and the stream is read from socket. Average throughput is 220K tuples p/s. The job is running for a while (approx. 10 mins) as expected, then I get the message above. There is similar thread in mailing list but no conclusion is reached there. Thanks Jeyhun -- -Cheers Jeyhun
Re: HashingTF for TF.IDF computation
Thanks Yanbo! On Sun, Oct 23, 2016 at 1:57 PM, Yanbo Liangwrote: > HashingTF was not designed to handle your case, you can try > CountVectorizer who will keep the original terms as vocabulary for > retrieving. CountVectorizer will compute a global term-to-index map, > which can be expensive for a large corpus and has the risk of OOM. IDF > can accept feature vectors generated by HashingTF or CountVectorizer. > FYI http://spark.apache.org/docs/latest/ml-features.html#tf-idf > > Thanks > Yanbo > > On Thu, Oct 20, 2016 at 10:00 AM, Ciumac Sergiu > wrote: > >> Hello everyone, >> >> I'm having a usage issue with HashingTF class from Spark MLLIB. >> >> I'm computing TF.IDF on a set of terms/documents which later I'm using to >> identify most important ones in each of the input document. >> >> Below is a short code snippet which outlines the example (2 documents >> with 2 words each, executed on Spark 2.0). >> >> val documentsToEvaluate = sc.parallelize(Array(Seq("Mars", >> "Jupiter"),Seq("Venus", "Mars"))) >> val hashingTF = new HashingTF() >> val tf = hashingTF.transform(documentsToEvaluate) >> tf.cache() >> val idf = new IDF().fit(tf) >> val tfidf: RDD[Vector] = idf.transform(tf) >> documentsToEvaluate.zip(tfidf).saveAsTextFile("/tmp/tfidf") >> >> The computation yields to the following result: >> >> (List(Mars, Jupiter),(1048576,[593437,962819],[0.4054651081081644,0.0])) >> (List(Venus, Mars),(1048576,[798918,962819],[0.4054651081081644,0.0])) >> >> My concern is that I can't get a mapping of TF.IDF weights an initial >> terms used (i.e. Mars : 0.0, Jupiter : 0.4, Venus : 0.4. You may notice >> that the weight and terms indices do not correspond after zipping 2 >> sequences). I can only identify the hash (i.e. 593437 : 0.4) mappings. >> >> I know HashingTF uses the hashing trick to compute TF. My question is it >> possible to retrieve terms / weights mapping, or HashingTF was not designed >> to handle this use-case. If latter, what other implementation of TF.IDF you >> may recommend. >> >> I may continue the computation with the (*hash:weight*) tuple, though >> getting initial (*term:weight)* would result in a lot easier debugging >> steps later down the pipeline. >> >> Any response will be greatly appreciated! >> >> Regards, Sergiu Ciumac >> > >
Dataflow of Spark/Hadoop in steps
I would like to know if I have 100 GB data and I would like to find the most common world ,actually what is going on in my cluster(lets say a master node and 6 workers) step by step.(1) what does the master(2)? start the mapreduce job, monitor the traffic and return the result? the same goes for what the mappers and reducers (can they be a different node/worker?) do(3)?. The reducers always wait for all the mappers to finish before they start?(4) and who combine/attach the final output? For example: These is the input for the reducers ,these tuples(I go for an easy example that each word is unique to every reducer, which means the shuffle step has been done correctly) reducer 1: {dog,1} ,{banana,1},{oreo,6} reducer 2: {peach,1} ,{mesut,5},{ozil,10} reducer 3: {I,4} ,{witch,2} reducer 4: {fear,1} ,{goal,6},{arsenal,3} The output of each reducer should be : reducer 1: {oreo,6} reducer 2: {ozil,10} reducer 3: {I,4} reducer 4: {goal,6} Now we need to combine the results, to who we send the output and he will do sort and aggregate(the master?) (5)? and in those steps and before where there are I/O calls?(6) (when the data is stored on local disk and when on HDFS). In addition in Hadoop as far as I know we need to deploy the Map and Reduce functions to the matching workers, can we change the functions in run time?(7) , if we have done a mapreduce job and we want to that again(go over our results) do we have to split the data again and send it to each worker?(8) P.S. I have numbered the questions for understanding where are my questions. Any more comments or notions would be appreciated :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataflow-of-Spark-Hadoop-in-steps-tp27946.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: HashingTF for TF.IDF computation
HashingTF was not designed to handle your case, you can try CountVectorizer who will keep the original terms as vocabulary for retrieving. CountVectorizer will compute a global term-to-index map, which can be expensive for a large corpus and has the risk of OOM. IDF can accept feature vectors generated by HashingTF or CountVectorizer. FYI http://spark.apache.org/docs/latest/ml-features.html#tf-idf Thanks Yanbo On Thu, Oct 20, 2016 at 10:00 AM, Ciumac Sergiuwrote: > Hello everyone, > > I'm having a usage issue with HashingTF class from Spark MLLIB. > > I'm computing TF.IDF on a set of terms/documents which later I'm using to > identify most important ones in each of the input document. > > Below is a short code snippet which outlines the example (2 documents with > 2 words each, executed on Spark 2.0). > > val documentsToEvaluate = sc.parallelize(Array(Seq("Mars", > "Jupiter"),Seq("Venus", "Mars"))) > val hashingTF = new HashingTF() > val tf = hashingTF.transform(documentsToEvaluate) > tf.cache() > val idf = new IDF().fit(tf) > val tfidf: RDD[Vector] = idf.transform(tf) > documentsToEvaluate.zip(tfidf).saveAsTextFile("/tmp/tfidf") > > The computation yields to the following result: > > (List(Mars, Jupiter),(1048576,[593437,962819],[0.4054651081081644,0.0])) > (List(Venus, Mars),(1048576,[798918,962819],[0.4054651081081644,0.0])) > > My concern is that I can't get a mapping of TF.IDF weights an initial > terms used (i.e. Mars : 0.0, Jupiter : 0.4, Venus : 0.4. You may notice > that the weight and terms indices do not correspond after zipping 2 > sequences). I can only identify the hash (i.e. 593437 : 0.4) mappings. > > I know HashingTF uses the hashing trick to compute TF. My question is it > possible to retrieve terms / weights mapping, or HashingTF was not designed > to handle this use-case. If latter, what other implementation of TF.IDF you > may recommend. > > I may continue the computation with the (*hash:weight*) tuple, though > getting initial (*term:weight)* would result in a lot easier debugging > steps later down the pipeline. > > Any response will be greatly appreciated! > > Regards, Sergiu Ciumac >