RE: HBase HTable constructor hangs
I turned on the TRACE and I see lot of following exception: java.lang.IllegalAccessError: com/google/protobuf/ZeroCopyLiteralByteString at org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:897) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildGetRowOrBeforeRequest(RequestConverter.java:131) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1402) at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:701) at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:699) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:120) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) Thanks Tridib From: d...@ocirs.com Date: Tue, 28 Apr 2015 22:24:39 -0700 Subject: Re: HBase HTable constructor hangs To: tridib.sama...@live.com In that case, something else is failing and the reason HBase looks like it hangs is that the hbase timeout or retry count is too high. Try setting the following conf and hbase will only hang for a few mins max and return a helpful error message. hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2) -- Dean Chen On Tue, Apr 28, 2015 at 10:18 PM, Tridib Samanta tridib.sama...@live.com wrote: Nope, my hbase is unsecured. From: d...@ocirs.com Date: Tue, 28 Apr 2015 22:09:51 -0700 Subject: Re: HBase HTable constructor hangs To: tridib.sama...@live.com Hi Tridib, Are you running this on a secure Hadoop/HBase cluster? I ran in to a similar issue where the HBase client can successfully connect in local mode and in the yarn-client driver but not on remote executors. The problem is that Spark doesn't distribute the hbase auth key, see the following Jira ticket and PR. https://issues.apache.org/jira/browse/SPARK-6918 -- Dean Chen On Tue, Apr 28, 2015 at 9:34 PM, Tridib Samanta tridib.sama...@live.com wrote: I am 100% sure how it's picking up the configuration. I copied the hbase-site.xml in hdfs/spark cluster (single machine). I also included hbase-site.xml in spark-job jar files. spark-job jar file also have yarn-site and mapred-site and core-site.xml in it. One interesting thing is, when I run the spark-job jar as standalone and execute the HBase client from a main method, it works fine. Same client unable to connect/hangs when the jar is distributed in spark. Thanks Tridib Date: Tue, 28 Apr 2015 21:25:41 -0700 Subject: Re: HBase HTable constructor hangs From: yuzhih...@gmail.com To: tridib.sama...@live.com CC: user@spark.apache.org How did you distribute hbase-site.xml to the nodes ? Looks like HConnectionManager couldn't find the hbase:meta server. Cheers On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote: I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0. Here is the jstack trace. Complete stack trace attached. Executor task launch worker-1 #58 daemon prio=5 os_prio=0 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152) - locked 0xf8cb7258 (a org.apache.hadoop.hbase.client.RpcRetryingCaller) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) - locked 0xf84ac0b0 (a java.lang.Object) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:150) at
spark with standalone HBase
Hi, I am working with standalone HBase. And I want to execute HBaseTest.scala (in scala examples) . I have created a test table with three rows and I just want to get the count using HBaseTest.scala I am getting this issue: 15/04/29 11:17:10 INFO BlockManagerMaster: Registered BlockManager 15/04/29 11:17:11 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT 15/04/29 11:17:11 INFO ZooKeeper: Client environment:host.name =ip-10-144-185-113 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.version=1.7.0_79 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.class.path=/home/ubuntu/sparkfolder/conf/:/home/ubuntu/sparkfolder/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-core-3.2.10.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/home/ubuntu/sparkfolder/lib_managed/jars/datanucleus-rdbms-3.2.9.jar 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp 15/04/29 11:17:11 INFO ZooKeeper: Client environment:java.compiler=NA 15/04/29 11:17:11 INFO ZooKeeper: Client environment:os.name=Linux 15/04/29 11:17:11 INFO ZooKeeper: Client environment:os.arch=amd64 15/04/29 11:17:11 INFO ZooKeeper: Client environment:os.version=3.13.0-49-generic 15/04/29 11:17:11 INFO ZooKeeper: Client environment:user.name=root 15/04/29 11:17:11 INFO ZooKeeper: Client environment:user.home=/root 15/04/29 11:17:11 INFO ZooKeeper: Client environment:user.dir=/home/ubuntu/sparkfolder 15/04/29 11:17:11 INFO ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=9 watcher=hconnection-0x2711025f, quorum=localhost:2181, baseZNode=/hbase 15/04/29 11:17:11 INFO RecoverableZooKeeper: Process identifier=hconnection-0x2711025f connecting to ZooKeeper ensemble=localhost:2181 15/04/29 11:17:11 INFO ClientCnxn: Opening socket connection to server ip-10-144-185-113/10.144.185.113:2181. Will not attempt to authenticate using SASL (unknown error) 15/04/29 11:17:11 INFO ClientCnxn: Socket connection established to ip-10-144-185-113/10.144.185.113:2181, initiating session 15/04/29 11:17:11 INFO ClientCnxn: Session establishment complete on server ip-10-144-185-113/10.144.185.113:2181, sessionid = 0x14d04d506da0005, negotiated timeout = 4 15/04/29 11:17:11 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is null Its just stuck Not showing any error. There is no Hadoop on my machine. What could be the issue? here is hbase-site.xml: configuration property namehbase.zookeeper.quorum/name valuelocalhost/value /property property namehbase.zookeeper.property.clientPort/name value2181/value /property property namezookeeper.znode.parent/name value/hbase/value /property /configuration
Re: java.io.IOException: No space left on device
Or multiple volumes. The LOCAL_DIRS (YARN) and SPARK_LOCAL_DIRS (Mesos, Standalone) environment variables and the spark.local.dir property control where temporary data is written. The default is /tmp. See http://spark.apache.org/docs/latest/configuration.html#runtime-environment for more details. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 29, 2015 at 6:19 AM, Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: No space left on device
Makes sense. / is where /tmp would be. However, 230G should be plenty of space. If you have INFO logging turned on (set in $SPARK_HOME/conf/log4j.properties), you'll see messages about saving data to disk that will list sizes. The web console also has some summary information about this. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com wrote: This is the output of df -h so as you can see I'm using only one disk mounted on / df -h Filesystem Size Used Avail Use% Mounted on /dev/sda8 276G 34G 229G 13% /none4.0K 0 4.0K 0% /sys/fs/cgroup udev7.8G 4.0K 7.8G 1% /dev tmpfs 1.6G 1.4M 1.6G 1% /runnone5.0M 0 5.0M 0% /run/locknone7.8G 37M 7.8G 1% /run/shmnone 100M 40K 100M 1% /run/user /dev/sda1 496M 55M 442M 11% /boot/efi Also when running the program, I noticed that the Used% disk space related to the partition mounted on / was growing very fast On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How Spark SQL supports primary and secondary indexes
Hi all, I execute simple SQL query and got a unacceptable performance. I do the following steps: 1. Apply a schema to an RDD and register table. 2. Run sql query which returns several entries: Running time for this query 0.2s (table contains 10 entries). I think that Spark SQL has full in-memory scan and index might increase performance. How can I add indexes? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-SQL-supports-primary-and-secondary-indexes-tp22700.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.io.IOException: No space left on device
Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to group multiple row data ?
Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event, CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out of them such that I can infer that Customer registered event from 1to 2 and if present to 3 timewise and preserving the number of entries. For e.g. Before processing: 10001, 132, 2002, 1, 2012-11-23 10001, 132, 2002, 1, 2012-11-24 10031, 102, 223, 2, 2012-11-24 10001, 132, 2002, 2, 2012-11-25 10001, 132, 2002, 3, 2012-11-26 (total 5 rows) After processing: 10001, 132, 2002, 2012-11-23, 1 10031, 102, 223, 2012-11-24, 2 10001, 132, 2002, 2012-11-24, 1,2,3 (total 5 in last field - comma separated!) The group must only take the closest previous trigger. The first one hence shows alone. Can this be done using spark sql ? If it needs to processed in functionally in scala, how to do this. I can't wrap my head around this. Can anyone help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dataframe filter based on another Dataframe
Hi everyone, what is the most efficient way to filter a DataFrame on a column from another Dataframe's column. The best idea I had, was to join the two dataframes : val df1 : Dataframe val df2: Dataframe df1.join(df2, df1(id) === df2(id), inner) But I end up (obviously) with the id column twice. Another approach would be to filter df1 but I can't seem to get this to work using df2's column as a base Any idea ? Regards, Olivier.
Re: Driver memory leak?
Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours ), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: A problem of using spark streaming to capture network packets
I would use the ps command on each machine while the job is running to confirm that every process involved is running as root. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 28, 2015 at 8:58 PM, Lin Hao Xu xulin...@cn.ibm.com wrote: btw, from spark web ui, the acl is marked with *root* Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running as root? If not, that]Dean Wampler ---2015/04/29 09:40:34---Are the tasks on the slaves also running as root? If not, that might explain the problem. From: Dean Wampler deanwamp...@gmail.com To: Lin Hao Xu/China/IBM@IBMCN Cc: Hai Shan Wu/China/IBM@IBMCN, user user@spark.apache.org Date: 2015/04/29 09:40 Subject: Re: A problem of using spark streaming to capture network packets -- Are the tasks on the slaves also running as root? If not, that might explain the problem. dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Tue, Apr 28, 2015 at 8:30 PM, Lin Hao Xu *xulin...@cn.ibm.com* xulin...@cn.ibm.com wrote: 1. The full command line is written in a shell script: LIB=/home/spark/.m2/repository /opt/spark/bin/spark-submit \ --class spark.pcap.run.TestPcapSpark \ --jars $LIB/org/pcap4j/pcap4j-core/1.4.0/pcap4j-core-1.4.0.jar,$LIB/org/pcap4j/pcap4j-packetfactory-static/1.4.0/pcap4j-packetfactory-static-1.4.0.jar,$LIB/ org/slf4j/slf4j-api/1.7.6/slf4j-api-1.7.6.jar,$LIB/org/slf4j/slf4j-log4j12/1.7.6/slf4j-log4j12-1.7.6.jar,$LIB/net/java/dev/jna/jna/4.1.0/jna-4.1.0.jar \ /home/spark/napa/napa.jar 2. And we run this script with *sudo*, if you do not use sudo, then you cannot access network interface. 3. We also tested ListPcapNetworkInterface nifs = Pcaps.findAllDevs() in a standard Java program, it really worked like a champion. Best regards, Lin Hao XU IBM Research China Email: *xulin...@cn.ibm.com* xulin...@cn.ibm.com My Flickr: *http://www.flickr.com/photos/xulinhao/sets* http://www.flickr.com/photos/xulinhao/sets [image: Inactive hide details for Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you u]Dean Wampler ---2015/04/28 20:07:54---It's probably not your code. What's the full command line you use to submit the job? From: Dean Wampler *deanwamp...@gmail.com* deanwamp...@gmail.com To: Hai Shan Wu/China/IBM@IBMCN Cc: user *user@spark.apache.org* user@spark.apache.org, Lin Hao Xu/China/IBM@IBMCN Date: 2015/04/28 20:07 Subject: Re: A problem of using spark streaming to capture network packets -- It's probably not your code. What's the full command line you use to submit the job? Are you sure the job on the cluster has access to the network interface? Can you test the receiver by itself without Spark? For example, does this line work as expected: ListPcapNetworkInterface nifs = Pcaps.findAllDevs(); dean Dean Wampler, Ph.D. Author: *Programming Scala, 2nd Edition* http://shop.oreilly.com/product/0636920033073.do (O'Reilly) *Typesafe* http://typesafe.com/ *@deanwampler* http://twitter.com/deanwampler *http://polyglotprogramming.com* http://polyglotprogramming.com/ On Mon, Apr 27, 2015 at 4:03 AM, Hai Shan Wu *wuh...@cn.ibm.com* wuh...@cn.ibm.com wrote: Hi Everyone We use pcap4j to capture network packets and then use spark streaming to analyze captured packets. However, we met a strange problem. If we run our application on spark locally (for example, spark-submit --master local[2]), then the program runs successfully. If we run our application on spark standalone cluster, then the program will tell us that NO NIFs found. I also attach two test files for clarification. So anyone can help on this? Thanks in advance! * (See attached file: PcapReceiver.java)(See attached file: TestPcapSpark.java)* Best regards, - Haishan Haishan Wu (吴海珊) IBM Research - China Tel: 86-10-58748508 Fax: 86-10-58748330 Email: *wuh...@cn.ibm.com* wuh...@cn.ibm.com Lotus Notes: Hai Shan Wu/China/IBM - To unsubscribe, e-mail: *user-unsubscr...@spark.apache.org*
Re: java.io.IOException: No space left on device
Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: No space left on device
This is the output of df -h so as you can see I'm using only one disk mounted on / df -h Filesystem Size Used Avail Use% Mounted on /dev/sda8 276G 34G 229G 13% /none4.0K 0 4.0K 0% /sys/fs/cgroup udev7.8G 4.0K 7.8G 1% /dev tmpfs 1.6G 1.4M 1.6G 1% /runnone5.0M 0 5.0M 0% /run/locknone7.8G 37M 7.8G 1% /run/shmnone 100M 40K 100M 1% /run/user /dev/sda1 496M 55M 442M 11% /boot/efi Also when running the program, I noticed that the Used% disk space related to the partition mounted on / was growing very fast On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLib KMeans on large dataset issues
Hi Sparkers, I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a large K but I've encountered the following issues: - Spark driver gets out of memory and dies because collect gets called as part of KMeans, which loads all data back to the driver's memory. - At the end there is a LocalKMeans class which runs KMeansPlusPlus on the Spark driver. Why isn't this distributed? It's spending a long time on here and this has the same problem as point 1 requires loading the data to the driver. Also when LocakKMeans is running on driver also seeing lots of : 15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus initialization ran out of distinct points for centers. Using duplicate point for center k = 222 - Has the above behaviour been like this in previous releases? I remember running KMeans before without too much problems. Looking forward to hear you point out my stupidity or provide work-arounds that could make Spark KMeans work well on large datasets. Regards, Sam Stoelinga
Re: MLib KMeans on large dataset issues
How you are passing feature vector to K means? its in 2-D space of 1-D array? Did you try using Streaming Kmeans? will you be able to paste code here? On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote: Hi Sparkers, I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a large K but I've encountered the following issues: - Spark driver gets out of memory and dies because collect gets called as part of KMeans, which loads all data back to the driver's memory. - At the end there is a LocalKMeans class which runs KMeansPlusPlus on the Spark driver. Why isn't this distributed? It's spending a long time on here and this has the same problem as point 1 requires loading the data to the driver. Also when LocakKMeans is running on driver also seeing lots of : 15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus initialization ran out of distinct points for centers. Using duplicate point for center k = 222 - Has the above behaviour been like this in previous releases? I remember running KMeans before without too much problems. Looking forward to hear you point out my stupidity or provide work-arounds that could make Spark KMeans work well on large datasets. Regards, Sam Stoelinga
Re: MLib KMeans on large dataset issues
I'm mostly using example code, see here: http://paste.openstack.org/show/211966/ The data has 799305 dimensions and is separated by space Please note the issues I'm seeing is because of the scala implementation imo as it happens also when using the Python wrappers. On Wed, Apr 29, 2015 at 8:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: How you are passing feature vector to K means? its in 2-D space of 1-D array? Did you try using Streaming Kmeans? will you be able to paste code here? On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote: Hi Sparkers, I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a large K but I've encountered the following issues: - Spark driver gets out of memory and dies because collect gets called as part of KMeans, which loads all data back to the driver's memory. - At the end there is a LocalKMeans class which runs KMeansPlusPlus on the Spark driver. Why isn't this distributed? It's spending a long time on here and this has the same problem as point 1 requires loading the data to the driver. Also when LocakKMeans is running on driver also seeing lots of : 15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus initialization ran out of distinct points for centers. Using duplicate point for center k = 222 - Has the above behaviour been like this in previous releases? I remember running KMeans before without too much problems. Looking forward to hear you point out my stupidity or provide work-arounds that could make Spark KMeans work well on large datasets. Regards, Sam Stoelinga
Re: Driver memory leak?
The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. @Sean Will it be backported to CDH? I did't find that bug in CDH 5.4 release notes. 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours ), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: How to group multiple row data ?
Sorry but I didn't fully understand the grouping. This line: The group must only take the closest previous trigger. The first one hence shows alone. Can you please explain further? On Wed, Apr 29, 2015 at 4:42 PM, bipin bipin@gmail.com wrote: Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event, CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out of them such that I can infer that Customer registered event from 1to 2 and if present to 3 timewise and preserving the number of entries. For e.g. Before processing: 10001, 132, 2002, 1, 2012-11-23 10001, 132, 2002, 1, 2012-11-24 10031, 102, 223, 2, 2012-11-24 10001, 132, 2002, 2, 2012-11-25 10001, 132, 2002, 3, 2012-11-26 (total 5 rows) After processing: 10001, 132, 2002, 2012-11-23, 1 10031, 102, 223, 2012-11-24, 2 10001, 132, 2002, 2012-11-24, 1,2,3 (total 5 in last field - comma separated!) The group must only take the closest previous trigger. The first one hence shows alone. Can this be done using spark sql ? If it needs to processed in functionally in scala, how to do this. I can't wrap my head around this. Can anyone help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: No space left on device
Sorry I put the log messages when creating the thread in http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-td22702.html but I forgot that raw messages will not be sent in emails. So this is the log related to the error : 15/04/29 02:48:50 INFO CacheManager: Partition rdd_19_0 not found, computing it 15/04/29 02:48:50 INFO BlockManager: Found block rdd_15_0 locally 15/04/29 02:48:50 INFO CacheManager: Partition rdd_19_1 not found, computing it 15/04/29 02:48:50 INFO BlockManager: Found block rdd_15_1 locally 15/04/29 02:49:13 WARN MemoryStore: Not enough space to cache rdd_19_1 in memory! (computed 1106.0 MB so far) 15/04/29 02:49:13 INFO MemoryStore: Memory use = 234.0 MB (blocks) + 2.6 GB (scratch space shared across 2 thread(s)) = 2.9 GB. Storage limit = 3.1 GB. 15/04/29 02:49:13 WARN CacheManager: Persisting partition rdd_19_1 to disk instead. 15/04/29 02:49:28 WARN MemoryStore: Not enough space to cache rdd_19_0 in memory! (computed 1745.7 MB so far) 15/04/29 02:49:28 INFO MemoryStore: Memory use = 234.0 MB (blocks) + 2.6 GB (scratch space shared across 2 thread(s)) = 2.9 GB. Storage limit = 3.1 GB. 15/04/29 02:49:28 WARN CacheManager: Persisting partition rdd_19_0 to disk instead. 15/04/29 03:56:12 WARN BlockManager: Putting block rdd_19_0 failed 15/04/29 03:56:12 WARN BlockManager: Putting block rdd_19_1 failed 15/04/29 03:56:12 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 7) java.io.IOException: No space left on *device *It seems that the partitions rdd_19_0 and rdd_9=19_1 needs both of them 2.9 GB. Thanks On Wed, Apr 29, 2015 at 12:34 PM Dean Wampler deanwamp...@gmail.com wrote: Makes sense. / is where /tmp would be. However, 230G should be plenty of space. If you have INFO logging turned on (set in $SPARK_HOME/conf/log4j.properties), you'll see messages about saving data to disk that will list sizes. The web console also has some summary information about this. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Apr 29, 2015 at 6:25 AM, selim namsi selim.na...@gmail.com wrote: This is the output of df -h so as you can see I'm using only one disk mounted on / df -h Filesystem Size Used Avail Use% Mounted on /dev/sda8 276G 34G 229G 13% /none4.0K 0 4.0K 0% /sys/fs/cgroup udev7.8G 4.0K 7.8G 1% /dev tmpfs 1.6G 1.4M 1.6G 1% /runnone5.0M 0 5.0M 0% /run/locknone7.8G 37M 7.8G 1% /run/shmnone 100M 40K 100M 1% /run/user /dev/sda1 496M 55M 442M 11% /boot/efi Also when running the program, I noticed that the Used% disk space related to the partition mounted on / was growing very fast On Wed, Apr 29, 2015 at 12:19 PM Anshul Singhle ans...@betaglide.com wrote: Do you have multiple disks? Maybe your work directory is not in the right disk? On Wed, Apr 29, 2015 at 4:43 PM, Selim Namsi selim.na...@gmail.com wrote: Hi, I'm using spark (1.3.1) MLlib to run random forest algorithm on tfidf output,the training data is a file containing 156060 (size 8.1M). The problem is that when trying to presist a partition into memory and there is not enought memory, the partition is persisted on disk and despite Having 229G of free disk space, I got No space left on device.. This is how I'm running the program : ./spark-submit --class com.custom.sentimentAnalysis.MainPipeline --master local[2] --driver-memory 5g ml_pipeline.jar labeledTrainData.tsv testData.tsv And this is a part of the log: If you need more informations, please let me know. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-IOException-No-space-left-on-device-tp22702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataframe filter based on another Dataframe
You can use .select to project only columns you need On Wed, Apr 29, 2015 at 9:23 PM, Olivier Girardot ssab...@gmail.com wrote: Hi everyone, what is the most efficient way to filter a DataFrame on a column from another Dataframe's column. The best idea I had, was to join the two dataframes : val df1 : Dataframe val df2: Dataframe df1.join(df2, df1(id) === df2(id), inner) But I end up (obviously) with the id column twice. Another approach would be to filter df1 but I can't seem to get this to work using df2's column as a base Any idea ? Regards, Olivier. -- Best Regards, Ayan Guha
Re: MLib KMeans on large dataset issues
Guys, great feedback by pointing out my stupidity :D Rows and columns got intermixed hence the weird results I was seeing. Ignore my previous issues will reformat my data first. On Wed, Apr 29, 2015 at 8:47 PM, Sam Stoelinga sammiest...@gmail.com wrote: I'm mostly using example code, see here: http://paste.openstack.org/show/211966/ The data has 799305 dimensions and is separated by space Please note the issues I'm seeing is because of the scala implementation imo as it happens also when using the Python wrappers. On Wed, Apr 29, 2015 at 8:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: How you are passing feature vector to K means? its in 2-D space of 1-D array? Did you try using Streaming Kmeans? will you be able to paste code here? On 29 April 2015 at 17:23, Sam Stoelinga sammiest...@gmail.com wrote: Hi Sparkers, I am trying to run MLib kmeans on a large dataset(50+Gb of data) and a large K but I've encountered the following issues: - Spark driver gets out of memory and dies because collect gets called as part of KMeans, which loads all data back to the driver's memory. - At the end there is a LocalKMeans class which runs KMeansPlusPlus on the Spark driver. Why isn't this distributed? It's spending a long time on here and this has the same problem as point 1 requires loading the data to the driver. Also when LocakKMeans is running on driver also seeing lots of : 15/04/29 08:42:25 WARN clustering.LocalKMeans: kMeansPlusPlus initialization ran out of distinct points for centers. Using duplicate point for center k = 222 - Has the above behaviour been like this in previous releases? I remember running KMeans before without too much problems. Looking forward to hear you point out my stupidity or provide work-arounds that could make Spark KMeans work well on large datasets. Regards, Sam Stoelinga
DataFrame filter referencing error
Hi all, I was testing the DataFrame filter functionality and I found what I think is a strange behaviour. My dataframe testDF, obtained loading aMySQL table via jdbc, has the following schema: root | -- id: long (nullable = false) | -- title: string (nullable = true) | -- value: string (nullable = false) | -- status: string (nullable = false) What I want to do is filter my dataset to obtain all rows that have a status = new. scala testDF.filter(testDF(id) === 1234).first() works fine (also with the integer value within double quotes), however if I try to use the same statement to filter on the status column (also with changes in the syntax - see below), suddenly the program breaks. Any of the following scala testDF.filter(testDF(status) === new) scala testDF.filter(status = 'new') scala testDF.filter($status === new) generates the error: INFO scheduler.DAGScheduler: Job 3 failed: runJob at SparkPlan.scala:121, took 0.277907 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 12, node name): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'new' in 'where clause' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) at com.mysql.jdbc.Util.getInstance(Util.java:386) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283) at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.init(JDBCRDD.scala:328) at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Does filter work only on columns of the integer type? What is the exact behaviour of the filter function and what is the best way to handle the query I am trying to execute? Thank you, Francesco
Re: Driver memory leak?
The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours ), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: DataFrame filter referencing error
Looks like you DF is based on a MySQL DB using jdbc, and error is thrown from mySQL. Can you see what SQL is finally getting fired in MySQL? Spark is pushing down the predicate to mysql so its not a spark problem perse On Wed, Apr 29, 2015 at 9:56 PM, Francesco Bigarella francesco.bigare...@gmail.com wrote: Hi all, I was testing the DataFrame filter functionality and I found what I think is a strange behaviour. My dataframe testDF, obtained loading aMySQL table via jdbc, has the following schema: root | -- id: long (nullable = false) | -- title: string (nullable = true) | -- value: string (nullable = false) | -- status: string (nullable = false) What I want to do is filter my dataset to obtain all rows that have a status = new. scala testDF.filter(testDF(id) === 1234).first() works fine (also with the integer value within double quotes), however if I try to use the same statement to filter on the status column (also with changes in the syntax - see below), suddenly the program breaks. Any of the following scala testDF.filter(testDF(status) === new) scala testDF.filter(status = 'new') scala testDF.filter($status === new) generates the error: INFO scheduler.DAGScheduler: Job 3 failed: runJob at SparkPlan.scala:121, took 0.277907 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 12, node name): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Unknown column 'new' in 'where clause' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) at com.mysql.jdbc.Util.getInstance(Util.java:386) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1052) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3597) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3529) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1990) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2151) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2625) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2119) at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:2283) at org.apache.spark.sql.jdbc.JDBCRDD$anon$1.init(JDBCRDD.scala:328) at org.apache.spark.sql.jdbc.JDBCRDD.compute(JDBCRDD.scala:309) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Does filter work only on columns of the integer type? What is the exact behaviour of the filter function and what is the best way to handle the query I am trying to execute? Thank you, Francesco -- Best Regards, Ayan Guha
Re: How to stream all data out of a Kafka topic once, then terminate job?
Yes, and Kafka topics are basically queues. So perhaps what's needed is just KafkaRDD with starting offset being 0 and finish offset being a very large number... Sent from my iPhone On Apr 29, 2015, at 1:52 AM, ayan guha guha.a...@gmail.com wrote: I guess what you mean is not streaming. If you create a stream context at time t, you will receive data coming through starting time t++, not before time t. Looks like you want a queue. Let Kafka write to a queue, consume msgs from the queue and stop when queue is empty. On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Correct myself: For the SparkContext#wholeTextFile, the RDD's elements are kv pairs, the key is the file path, and the value is the file content So,for the SparkContext#wholeTextFile, the RDD has already carried the file information. bit1...@163.com From: Saisai Shao Date: 2015-04-29 15:50 To: Akhil Das CC: bit1...@163.com; Vadim Bichutskiy; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name Yes, looks like a solution but quite tricky. You have to parse the debug string to get the file name, also relies on HadoopRDD to get the file name :) 2015-04-29 14:52 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com: It is possible to access the filename, its a bit tricky though. val fstream = ssc.fileStream[LongWritable, IntWritable, SequenceFileInputFormat[LongWritable, IntWritable]](/home/akhld/input/) fstream.foreach(x ={ //You can get it with this object. println(x.values.toDebugString) } ) Thanks Best Regards On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote: For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:54 To: Vadim Bichutskiy CC: bit1...@163.com; lokeshkumar; user Subject: Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from bit1...@163.com From: Saisai Shao Date: 2015-04-29 10:10 To: lokeshkumar CC: spark users Subject: Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(80K) 极速下载 在线预览
Re: ReduceByKey and sorting within partitions
On 04/27/2015 06:00 PM, Ganelin, Ilya wrote: Marco - why do you want data sorted both within and across partitions? If you need to take an ordered sequence across all your data you need to either aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an ordered index to your data that matches the order it was stored on HDFS. You can then get the data in order by filtering based on that index. Let me know if that's not what you need - thanks! Basically, after a mapping d - (k,v), I've to aggregate my data grouped by key and I also want that the output of this aggregation is sorted. A way to do that can be something like flatpMapToPair(myMapFunc).reduceByKey(RangePartitioner,myReduceFunc).mapPartition(i - sort(i)). But I was thinking that the sorting phase can be pushed down to the shuffle phase, as the same thing is done in sortByKey and repartitionAndSortWithinPartition, calling setKeyOrdering on the shuffleRDD returned by reduceByKey (or combineByKey). Am I wrong? I'm not a Scala programmer, is there an easy way to do that with actual java apis? If not, what is the quickest way to do that in Scala? Also a more trival question. I can't find how to use RangePartitioner from Java because I can't understand what to provide for Ordering and ClassTag constructor parameters from Java, where I can find some reference/examples? Thank you all, Marco Sent with Good (www.good.com) -Original Message- From: Marco [marcope...@gmail.commailto:marcope...@gmail.com] Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time To: user@spark.apache.org Subject: ReduceByKey and sorting within partitionsa Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set DEBUG level log of spark executor on Standalone deploy mode
Hi, I want to check the DEBUG log of spark executor on Standalone deploy mode. But, 1. Set log4j.properties in spark/conf folder on master node and restart cluster. no means above works. 2. usning spark-submit --properties-file log4j. Just print debug log to screen but executor log still seems to be INFO level So how could i set the log level of spark executor on Standalone to DEBUG? Env Info--- spark 1.1.0 Standalone deploy mode. Submit shell: bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master spark://master:7077 --executor-memory 600m --properties-file log4j.properties lib/spark-examples-1.1.0-hadoop2.3.0.jar hdfs://master:8000/kmeans/data-Kmeans-5.3g 8 1 Thanks! Wang Haihua
Equal Height and Depth Binning in Spark
Hi, I am trying to implement equal depth and equal height binning methods in spark. Any insights, existing code for this would be really helpful. Thanks, Kundan
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
Yes, looks like a solution but quite tricky. You have to parse the debug string to get the file name, also relies on HadoopRDD to get the file name :) 2015-04-29 14:52 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com: It is possible to access the filename, its a bit tricky though. val fstream = ssc.fileStream[LongWritable, IntWritable, SequenceFileInputFormat[LongWritable, IntWritable]](/home/akhld/input/) fstream.foreach(x ={ //You can get it with this object. println(x.values.toDebugString) } ) [image: Inline image 1] Thanks Best Regards On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote: For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:54 *To:* Vadim Bichutskiy vadim.bichuts...@gmail.com *CC:* bit1...@163.com; lokeshkumar lok...@dataken.net; user user@spark.apache.org *Subject:* Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:10 *To:* lokeshkumar lok...@dataken.net *CC:* spark users user@spark.apache.org *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Spark streaming - textFileStream/fileStream - Get file name
It is possible to access the filename, its a bit tricky though. val fstream = ssc.fileStream[LongWritable, IntWritable, SequenceFileInputFormat[LongWritable, IntWritable]](/home/akhld/input/) fstream.foreach(x ={ //You can get it with this object. println(x.values.toDebugString) } ) [image: Inline image 1] Thanks Best Regards On Wed, Apr 29, 2015 at 8:33 AM, bit1...@163.com bit1...@163.com wrote: For the SparkContext#textFile, if a directory is given as the path parameter ,then it will pick up the files in the directory, so the same thing will occur. -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:54 *To:* Vadim Bichutskiy vadim.bichuts...@gmail.com *CC:* bit1...@163.com; lokeshkumar lok...@dataken.net; user user@spark.apache.org *Subject:* Re: Re: Spark streaming - textFileStream/fileStream - Get file name I think it might be useful in Spark Streaming's file input stream, but not sure is it useful in SparkContext#textFile, since we specify the file by our own, so why we still need to know the file name. I will open up a JIRA to mention about this feature. Thanks Jerry 2015-04-29 10:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com: I was wondering about the same thing. Vadim ᐧ On Tue, Apr 28, 2015 at 10:19 PM, bit1...@163.com bit1...@163.com wrote: Looks to me that the same thing also applies to the SparkContext.textFile or SparkContext.wholeTextFile, there is no way in RDD to figure out the file information where the data in RDD is from -- bit1...@163.com *From:* Saisai Shao sai.sai.s...@gmail.com *Date:* 2015-04-29 10:10 *To:* lokeshkumar lok...@dataken.net *CC:* spark users user@spark.apache.org *Subject:* Re: Spark streaming - textFileStream/fileStream - Get file name I think currently there's no API in Spark Streaming you can use to get the file names for file input streams. Actually it is not trivial to support this, may be you could file a JIRA with wishes you want the community to support, so anyone who is interested can take a crack on this. Thanks Jerry 2015-04-29 0:13 GMT+08:00 lokeshkumar lok...@dataken.net: Hi Forum, Using spark streaming and listening to the files in HDFS using textFileStream/fileStream methods, how do we get the fileNames which are read by these methods? I used textFileStream which has file contents in JavaDStream and I got no success with fileStream as it is throwing me a compilation error with spark version 1.3.1. Can someone please tell me if we have an API function or any other way to get the file names that these streaming methods read? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-textFileStream-fileStream-Get-file-name-tp22692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about Memory Used and VCores Used
Hi, Good question. The extra memory comes from spark.yarn.executor.memoryOverhead, the space used for the application master, and the way the YARN rounds requests up. This explains it in a little more detail: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ -Sandy On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote: Hi,guys, I have the following computation with 3 workers: spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 1g -e 'select count(*) from table' The resources used are shown as below on the UI: I don't understand why the memory used is 15GB and vcores used is 5. I think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores used shoulde be executor-cores*numOfWorkers=6 Can you please explain the result?Thanks. -- bit1...@163.com
Re: How to stream all data out of a Kafka topic once, then terminate job?
I guess what you mean is not streaming. If you create a stream context at time t, you will receive data coming through starting time t++, not before time t. Looks like you want a queue. Let Kafka write to a queue, consume msgs from the queue and stop when queue is empty. On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiclass classification using Ml logisticRegression
Thank you for your Answer! Yes I would like to work on it. Selim On Mon, Apr 27, 2015 at 5:23 AM Joseph Bradley jos...@databricks.com wrote: Unfortunately, the Pipelines API doesn't have multiclass logistic regression yet, only binary. It's really a matter of modifying the current implementation; I just added a JIRA for it: https://issues.apache.org/jira/browse/SPARK-7159 You'll need to use the old LogisticRegression API to do multiclass for now, until that JIRA gets completed. (If you're interested in doing it, let me know via the JIRA!) Joseph On Fri, Apr 24, 2015 at 3:26 AM, Selim Namsi selim.na...@gmail.com wrote: Hi, I just started using spark ML pipeline to implement a multiclass classifier using LogisticRegressionWithLBFGS (which accepts as a parameters number of classes), I followed the Pipeline example in ML- guide and I used LogisticRegression class which calls LogisticRegressionWithLBFGS class : val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01) the problem is that LogisticRegression doesn't take numClasses as parameters Any idea how to solve this problem? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiclass-classification-using-Ml-logisticRegression-tp22644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to setup this false streaming problem
Hi Toni. Given there is more than one measure by (user, hour) what is the measure you want to keep? The sum?, the mean?, the most recent measure?. For the sum or the mean you don't need to care about the timing. And If you wan't to have the most recent then you can include the timestamp in the reduction function and use it to decide which measure you keep. On the other hand. Given the same userId, do you expect that the measures will arrive reasonably ordered? If that's the case, you may reduce using only userId as the key and use your reduction function to decide when to drop the last hour and publish the next one. Maybe using a Map[hour, (timestamp, measure)] as the reduction argument. The expected result of that reduction can be a Map with the significant measures per hour of that DStream. For example: Given a DStream with { UserId = 1, Ts = 8:30, measure = X}, { UserId = 1, Ts = 8:45, measure = Y}, { UserId = 1, Ts = 9:10, measure = Z} Before the reduction (1, Map(8:00 - (8:30, X)), (1, Map(8:00 - (8:45,Y)), (1, Map(9:10 - (9:10,Z)) After the reduction you can produce (assuming you want to keep the most recent) (1, Map(8:00 - (8:45,Y), 9:00 - (9,10, Z)) In a further step you can decide to dump to database the 8:00 measure because you have in that DStream a measure happening at 9:00 These way you can keep an structure that will use almost constant memory per userId because in the worst case you will have 2 hour in a map. Regards Nacho 2015-04-28 19:38 GMT+02:00 Toni Cebrián tcebr...@enerbyte.com: Hi, Just new to Spark and in need of some help for framing the problem I have. A problem well stated is half solved it's the saying :) Let's say that I have a DStream[String] basically containing Json of some measurements from IoT devices. In order to keep it simple say that after unmarshalling I have data like: case class Measurement(val deviceId:Long, val timestamp:Date, val measurement:Double) I need to use DStreams because there is some interest in monitoring real-time the measurements of devices. So let's say that I have a dashboard with hourly granularity, past hours are consolidated but current hour must be kept updated on every input. The problem is that the Time that matters is the timestamp in the Json not the receiving timestamp by Spark so I think that I have to keep a stateful DStream like the one described http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ . I have two questions here: 1. Once a given hour is gone, I'd like to flush the consolidated stream into a DB. I think the strategy is to have a Stream with key-values where the key is (userID, truncateByHour(timestamp)) and reducing over the values. But it seems to me that with this approach Spark has lost any sense of time, how would you flush all the RDDs with timestamps between 00:00:00 and 00:59:59 for instance? Maybe I'm missing some function in the API 2. How do you deal with events that come with timestamps in the past, is it a matter of ignoring them, finding a trade-off between memory and how long the stateful DStream is? But then, who is the one poping the mature time slices from the stateful stream. For me Spark Streaming would be the most natural way to face this problem, but maybe a simple Spark processing run every minute could keep easily the sorting by time of external events. I'd like to hear your thoughts. Toni
Re: Re: Question about Memory Used and VCores Used
Thanks Sandy, it is very useful! bit1...@163.com From: Sandy Ryza Date: 2015-04-29 15:24 To: bit1...@163.com CC: user Subject: Re: Question about Memory Used and VCores Used Hi, Good question. The extra memory comes from spark.yarn.executor.memoryOverhead, the space used for the application master, and the way the YARN rounds requests up. This explains it in a little more detail: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ -Sandy On Tue, Apr 28, 2015 at 7:12 PM, bit1...@163.com bit1...@163.com wrote: Hi,guys, I have the following computation with 3 workers: spark-sql --master yarn --executor-memory 3g --executor-cores 2 --driver-memory 1g -e 'select count(*) from table' The resources used are shown as below on the UI: I don't understand why the memory used is 15GB and vcores used is 5. I think the memory used should be executor-memory*numOfWorkers=3G*3=9G, and the Vcores used shoulde be executor-cores*numOfWorkers=6 Can you please explain the result?Thanks. bit1...@163.com 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 Catch.jpg(16K) 极速下载 在线预览
Re: sparksql - HiveConf not found during task deserialization
The issue is solved. There was a problem in my hive codebase. Once that was fixed, -Phive-provided spark is working fine against my hive jars. On 27 April 2015 at 08:00, Manku Timma manku.tim...@gmail.com wrote: Made some progress on this. Adding hive jars to the system classpath is needed. But looks like it needs to be towards the end of the system classes. Manually adding the hive classpath into Client.populateHadoopClasspath solved the issue. But a new issue has come up. It looks like some hive initialization needs to happen on the executors but is getting missed out. 15/04/25 07:40:25 ERROR executor.Executor: Exception in task 0.1 in stage 1.0 (TID 23) java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive.get() called without a hive db setup at org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:841) at org.apache.hadoop.hive.ql.plan.PlanUtils.configureInputJobPropertiesForStorageHandler(PlanUtils.java:776) at org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:253) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$11.apply(TableReader.scala:229) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$11.apply(TableReader.scala:229) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:172) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:172) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:216) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive.get() called without a hive db setup at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:211) at org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:797) ... 37 more On 25 April 2015 at 09:31, Manku Timma manku.tim...@gmail.com wrote: Setting SPARK_CLASSPATH is triggering other errors. Not working. On 25 April 2015 at 09:16, Manku Timma manku.tim...@gmail.com wrote: Actually found the culprit. The JavaSerializerInstance.deserialize is called with a classloader (of type MutableURLClassLoader) which has access to all the hive classes. But internally it triggers a call to loadClass but with the default classloader. Below is the stacktrace (line numbers in the JavaSerialization.scala will be a bit off due to my debugging statements). I will try out the SPARK_CLASSPATH setting. But I was wondering if this has something to do with the way spark-project.hive jars are created v/s the way open source apache-hive jars are created. Is this documented somewhere? The only info I see is Patrick Wendell's comment in https://github.com/apache/spark/pull/2241 (grep for published a modified version). 15/04/25 01:41:04 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main] java.lang.NoClassDefFoundError:
Re: Parquet error reading data that contains array of structs
Thanks for the detailed information! Now I can confirm that this is a backwards-compatibility issue. The data written by parquet 1.6rc7 follows the standard LIST structure. However, Spark SQL still uses old parquet-avro style two-level structures, which causes the problem. Cheng On 4/27/15 7:07 PM, Jianshi Huang wrote: FYI, Parquet schema output: message pig_schema { optional binary cust_id (UTF8); optional int32 part_num; optional group ip_list (LIST) { repeated group ip_t { optional binary ip (UTF8); } } optional group vid_list (LIST) { repeated group vid_t { optional binary vid (UTF8); } } optional group fso_list (LIST) { repeated group fso_t { optional binary fso (UTF8); } } } And Parquet meta output: creator: [parquet-mr (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr version 1.6.0rc7 (build ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7), parquet-mr] extra: pig.schema = cust_id: chararray,part_num: int,ip_list: {ip_t: (ip: chararray)},vid_list: {vid_t: (vid: chararray)},fso_list: {fso_t: (fso: chararray)} file schema: pig_schema cust_id: OPTIONAL BINARY O:UTF8 R:0 D:1 part_num:OPTIONAL INT32 R:0 D:1 ip_list: OPTIONAL F:1 .ip_t: REPEATED F:1 ..ip:OPTIONAL BINARY O:UTF8 R:1 D:3 vid_list:OPTIONAL F:1 .vid_t: REPEATED F:1 ..vid: OPTIONAL BINARY O:UTF8 R:1 D:3 fso_list:OPTIONAL F:1 .fso_t: REPEATED F:1 ..fso: OPTIONAL BINARY O:UTF8 R:1 D:3 row group 1: RC:1201092 TS:537930256 OFFSET:4 cust_id: BINARY GZIP DO:0 FPO:4 SZ:10629422/27627221/2.60 VC:1201092 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:10629426 SZ:358/252/0.70 VC:1201092 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:10629784 SZ:41331065/180501686/4.37 VC:10540378 ENC:PLAIN,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:51960849 SZ:58820404/254819721/4.33 VC:11011894 ENC:PLAIN,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:110781253 SZ:21363255/74981376/3.51 VC:5612655 ENC:PLAIN,RLE row group 2: RC:1830769 TS:1045506907 OFFSET:132144508 cust_id: BINARY GZIP DO:0 FPO:132144508 SZ:17720131/42110882/2.38 VC:1830769 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:149864639 SZ:486/346/0.71 VC:1830769 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:149865125 SZ:37687630/342050955/9.08 VC:20061916 ENC:PLAIN,PLAIN_DICTIONARY,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:187552755 SZ:56498124/516700215/9.15 VC:22410351 ENC:PLAIN,PLAIN_DICTIONARY,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:244050879 SZ:20110276/144644509/7.19 VC:10739272 ENC:PLAIN,PLAIN_DICTIONARY,RLE row group 3: RC:22445 TS:4304290 OFFSET:264161155 cust_id: BINARY GZIP DO:0 FPO:264161155 SZ:221527/516312/2.33 VC:22445 ENC:PLAIN,RLE,BIT_PACKED part_num: INT32 GZIP DO:0 FPO:264382682 SZ:102/64/0.63 VC:22445 ENC:PLAIN_DICTIONARY,RLE,BIT_PACKED ip_list: .ip_t: ..ip: BINARY GZIP DO:0 FPO:264382784 SZ:483962/1204312/2.49 VC:123097 ENC:PLAIN_DICTIONARY,RLE vid_list: .vid_t: ..vid:BINARY GZIP DO:0 FPO:264866746 SZ:622977/2122080/3.41 VC:133136 ENC:PLAIN,PLAIN_DICTIONARY,RLE fso_list: .fso_t: ..fso:BINARY GZIP DO:0 FPO:265489723 SZ:240588/461522/1.92 VC:62173 ENC:PLAIN_DICTIONARY,RLE Jianshi On Mon, Apr 27, 2015 at 12:40 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Had an offline discussion with Jianshi, the dataset was generated by Pig. Jianshi - Could you please attach the output of parquet-schema path-to-parquet-file? I guess this is a Parquet format backwards-compatibility issue. Parquet hadn't standardized representation of LIST and MAP until recently, thus many systems made their own choice and are not easily inter-operatable. In earlier days, Spark SQL used LIST and MAP formats similar to Avro, which was unfortunately not chosen as the current standard format. Details can be found here: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md This document also defines backwards-compatibility rules to handle legacy Parquet data written by old Parquet implementations in various systems. So ideally, now Spark SQL should always write data following the standard, and implement all backwards-compatibility rules to read legacy data. JIRA issue for this is https://issues.apache.org/jira/browse/SPARK-6774 I'm working on a PR https://github.com/apache/spark/pull/5422 for this. To fix
Re: Multiclass classification using Ml logisticRegression
Wrapping the old LogisticRegressionWithLBFGS could be a quick solution for 1.4, and it's not too hard so it's potentially to get into 1.4. In the long term, I will like to implement a new version like https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef which handles the scaling and intercepts implicitly in objective function so no overhead of creating new transformed dataset. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Wed, Apr 29, 2015 at 1:21 AM, selim namsi selim.na...@gmail.com wrote: Thank you for your Answer! Yes I would like to work on it. Selim On Mon, Apr 27, 2015 at 5:23 AM Joseph Bradley jos...@databricks.com wrote: Unfortunately, the Pipelines API doesn't have multiclass logistic regression yet, only binary. It's really a matter of modifying the current implementation; I just added a JIRA for it: https://issues.apache.org/jira/browse/SPARK-7159 You'll need to use the old LogisticRegression API to do multiclass for now, until that JIRA gets completed. (If you're interested in doing it, let me know via the JIRA!) Joseph On Fri, Apr 24, 2015 at 3:26 AM, Selim Namsi selim.na...@gmail.com wrote: Hi, I just started using spark ML pipeline to implement a multiclass classifier using LogisticRegressionWithLBFGS (which accepts as a parameters number of classes), I followed the Pipeline example in ML- guide and I used LogisticRegression class which calls LogisticRegressionWithLBFGS class : val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01) the problem is that LogisticRegression doesn't take numClasses as parameters Any idea how to solve this problem? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiclass-classification-using-Ml-logisticRegression-tp22644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HOw can I merge multiple DataFrame and remove duplicated key
I have multiple DataFrame objects each stored in a parquet file. The DataFrame just contains 3 columns (id, value, timeStamp). I need to union all the DataFrame objects together but for duplicated id only keep the record with the latest timestamp. How can I do that? I can do this for RDDs by sc.union() to union all the RDDs and then do a reduceByKey() to remove duplicated id by keeping only the one with latest timeStamp field. But how do I do it for DataFrame? Ningjun
Re: Spark on Cassandra
Hadoop version doesn't matter if you're just using cassandra. On Wed, Apr 29, 2015 at 12:08 PM, Matthew Johnson matt.john...@algomi.com wrote: Hi all, I am new to Spark, but excited to use it with our Cassandra cluster. I have read in a few places that Spark can interact directly with Cassandra now, so I decided to download it and have a play – I am happy to run it in standalone cluster mode initially. When I go to download it ( http://spark.apache.org/downloads.html) I see a bunch of pre-built versions for Hadoop and MapR, but no mention of Cassandra – if I am running it in standalone cluster mode, does it matter which pre-built package I download? Would all of them work? Or do I have to build it myself from source with some special config for Cassandra? Thanks! Matt
Re: Driver memory leak?
Not sure what you mean. It's already in CDH since 5.4 = 1.3.0 (This isn't the place to ask about CDH) I also don't think that's the problem. The process did not run out of memory. On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com wrote: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. @Sean Will it be backported to CDH? I did't find that bug in CDH 5.4 release notes. 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Spark on Cassandra
Hi all, I am new to Spark, but excited to use it with our Cassandra cluster. I have read in a few places that Spark can interact directly with Cassandra now, so I decided to download it and have a play – I am happy to run it in standalone cluster mode initially. When I go to download it ( http://spark.apache.org/downloads.html) I see a bunch of pre-built versions for Hadoop and MapR, but no mention of Cassandra – if I am running it in standalone cluster mode, does it matter which pre-built package I download? Would all of them work? Or do I have to build it myself from source with some special config for Cassandra? Thanks! Matt
Re: How Spark SQL supports primary and secondary indexes
I'm running this query with different parameter on the same RDD and got 0.2s for each query. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-SQL-supports-primary-and-secondary-indexes-tp22700p22706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: indexing an RDD [Python]
Hey Roberto, You will likely want to use a cogroup() then, but it hinges all on how your data looks, i.e. if you have the index in the key. Here's an example: http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#cogroup . Clone: RDDs are immutable, so if you need to make changes to it, those will result in a new RDD. Best, -Sven On Fri, Apr 24, 2015 at 4:49 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Hi, I may need to read many values. The list [0,4,5,6,8] is the locations of the rows I’d like to extract from the RDD (of labledPoints). Could you possibly provide a quick example? Also, I’m not quite sure how this work, but the resulting RDD should be a clone, as I may need to modify the values and preserve the original ones. Thank you, *From:* Sven Krasser [mailto:kras...@gmail.com] *Sent:* Friday, April 24, 2015 5:56 PM *To:* Pagliari, Roberto *Cc:* user@spark.apache.org *Subject:* Re: indexing an RDD [Python] The solution depends largely on your use case. I assume the index is in the key. In that case, you can make a second RDD out of the list of indices and then use cogroup() on both. If the list of indices is small, just using filter() will work well. If you need to read back a few select values to the driver, take a look at lookup(). On Fri, Apr 24, 2015 at 1:51 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I have an RDD of LabledPoints. Is it possible to select a subset of it based on a list of indeces? For example with idx=[0,4,5,6,8], I'd like to be able to create a new RDD with elements 0,4,5,6 and 8. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- www.skrasser.com http://www.skrasser.com/?utm_source=sig -- www.skrasser.com http://www.skrasser.com/?utm_source=sig
Compute pairwise distance
Dear Sparkers, I am working on an algorithm which requires the pair distance between all points (eg. DBScan, LOF, etc.). Computing this for *n* points will require produce a n^2 matrix. If the distance measure is symmetrical, this can be reduced to (n^2)/2. What would be the most optimal way of computing this? The paper *Pairwise Element Computation with MapReduce https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper describes different approaches to optimize this process within a map-reduce model. Although I don't believe this is applicable to Spark. How would you guys approach this? I first thought about broadcasting the original points to all the workers, and then compute the distances across the different workers. Although this requires all the points to be distributed across all the machines. But this feels rather brute-force, what do you guys think. I don't expect full solutions, but some pointers would be great. I think a good solution can be re-used for many algorithms. Kind regards, Fokko Driesprong
Re: Slower performance when bigger memory?
On Mon, Apr 27, 2015 at 7:36 AM, Shuai Zheng szheng.c...@gmail.com wrote: Thanks. So may I know what is your configuration for more/smaller executors on r3.8xlarge, how big of the memory that you eventually decide to give one executor without impact performance (for example: 64g? ). We're currently using 16 executors with 2 cores each per instance. This is mainly due to the S3 throughput observations I mentioned (all based on earlier versions, so I would not be surprised if larger executors will work equally well now). Among these, we chop the available memory up equally, but we still leave a lot of extra room for Python workers (as a result of SPARK-5395; since that's fixed, we have more room to grow the JVM heap now). Best, -Sven -- www.skrasser.com http://www.skrasser.com/?utm_source=sig
Sort (order by) of the big dataset
Hi, I have a 2 billion records dataset witch schema eventId: String, time: Double, value: Double. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 workers with 3GB memory. I keep failing to sort the mentioned dataset in Spark. I do the following: val pf = sqlContext.parquetFile(hdfs://my.net/data.parquet) pf.registerTempTable(data) val sorted = sqlContext.sql(select * from data order by time) sorted.saveAsParquetFile(hdfs://my.net/data-sorted.parquet) Spark starts to execute tasks and then errors like Exector Lost pop up in the web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), giving different types of Exceptions in explanation. My thinking is that the main problem is with GC overhead limit exception however I observe exceptions related to connection time out and shuffling write (java.lang.IllegalStateException: Shutdown in progress; org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ). What I tried: 1)Tried to order by eventId and by both order by eventId, time with the same result. 2)Looked at the shuffle parameters but the default do make sense. 3)Tried to repartition the data I am loading from Parquet: val pf3000 = pf.repartition(3000) in order to get smaller chunks of data passed to executors (originally there are 300 partitions). It did not help either. Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the original. 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not help. Could you suggest what might be the problem and what is the workaround? Just in case, I cannot have more RAM or more machines :) Best regards, Alexander
Performance advantage by loading data from local node over S3.
Hi all, I'm new to Spark so I'm sorry if the question is too vague. I'm currently trying to deploy a Spark cluster using YARN on an amazon EMR cluster. For the data storage I'm currently using S3 but would loading the data in HDFS from local node gives considerable performance advantage over loading from S3? Would the reduced traffic latency in data load affect the runtime largely, considering most of the computation is done in memory? Thank you, Nisrina.
RE: Spark on Cassandra
http://planetcassandra.org/getting-started-with-apache-spark-and-cassandra/ http://planetcassandra.org/blog/holy-momentum-batman-spark-and-cassandra-circa-2015-w-datastax-connector-and-java/ https://github.com/datastax/spark-cassandra-connector From: Cody Koeninger [mailto:c...@koeninger.org] Sent: Wednesday, April 29, 2015 12:15 PM To: Matthew Johnson Cc: user@spark.apache.org Subject: Re: Spark on Cassandra Hadoop version doesn't matter if you're just using cassandra. On Wed, Apr 29, 2015 at 12:08 PM, Matthew Johnson matt.john...@algomi.commailto:matt.john...@algomi.com wrote: Hi all, I am new to Spark, but excited to use it with our Cassandra cluster. I have read in a few places that Spark can interact directly with Cassandra now, so I decided to download it and have a play – I am happy to run it in standalone cluster mode initially. When I go to download it (http://spark.apache.org/downloads.html) I see a bunch of pre-built versions for Hadoop and MapR, but no mention of Cassandra – if I am running it in standalone cluster mode, does it matter which pre-built package I download? Would all of them work? Or do I have to build it myself from source with some special config for Cassandra? Thanks! Matt
Re: Extra stage that executes before triggering computation with an action
Thanks for the responses. Try removing toDebugString and see what happens. The toDebugString is performed after [d] (the action), as [e]. By then all stages are already executed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extra stage that executes before triggering computation with an action
I'm not sure, but I wonder if because you are using the Spark REPL that it may not be representing what a normal runtime execution would look like and is possibly eagerly running a partial DAG once you define an operation that would cause a shuffle. What happens if you setup your same set of commands [a-e] in a file and use the Spark REPL's `load` or `paste` command to load them all at once? On Wed, Apr 29, 2015 at 2:55 PM, Tom Hubregtsen thubregt...@gmail.com wrote: Thanks for the responses. Try removing toDebugString and see what happens. The toDebugString is performed after [d] (the action), as [e]. By then all stages are already executed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Extra stage that executes before triggering computation with an action
Hi, I am trying to see exactly what happens underneath the hood of Spar when performing a simple sortByKey. So far I've already discovered the fetch-files and both the temp-shuffle and shuffle files being written to disk, but there is still an extra stage that keeps on puzzling me. This is the code that I execute 1 by 1 in the spark-shell (I will refer to them as [letter]) val input = sc.textFile(path, 2) [a] val inputRDD = input.map(some lamdba to create key_value) [b] val result = inputRDD.sortByKey(true, 2) [c] result.saveAsTextFile [d] As there is only one shuffle, I expect to see two stages. This is confirmed by result.toDebugString: (2) ShuffledRDD[5] at sortByKey at console:25 [] - [c] +-(2) MapPartitionsRDD[2] at map at console:23 [] - [b] | ./Sort/input-10-records-2-parts/ MapPartitionsRDD[1] at textFile at console:21 [] - [a] | ./Sort/input-10-records-2-parts/ HadoopRDD[0] at textFile at console:21 [] - [a] As there is one indentation, there should be 2 stages. There is an extra RDD (MapPartitionsRDD[6]) that is created by [d], but is not a parent of my result RDD, so not listed in this trace. Now when I run these commands 1 by 1 in the spark-shell, I see the following execution: [a] [b] [c] (//no action performed yet) INFO SparkContext: Starting job: sortByKey at console:25 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at sortByKey at console:25), which has no missing parents INFO DAGScheduler: ResultStage 0 (sortByKey at console:25) finished in 0.109 s INFO DAGScheduler: Job 0 finished: sortByKey at console:25, [d] (// Here I trigger the computation with an actual action) INFO SparkContext: Starting job: saveAsTextFile at console:28 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[2] at map at console:23), which has no missing parents INFO DAGScheduler: ShuffleMapStage 1 (map at console:23) finished INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[6] at saveAsTextFile at console:28) INFO DAGScheduler: ResultStage 2 (saveAsTextFile at console:28) finished INFO DAGScheduler: Job 1 finished: saveAsTextFile at console:28 Job 1 with stage 1 and 2 seem logical for me, it is computing everything before and after the shuffle (wide dependency) respectively. Now what I find interesting and puzzling, is Job 0 with stage 0. It executes and finishes before I perform an action (in [d]), and with larger input set can also take a noticeable time. Does anybody have any idea what is running in this Job/stage 0? Thanks, Tom Hubregtsen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hardware provisioning for Spark SQl
Hi all, I have to estimate resource requirements for my hadoop/spark cluster. In particular, i have to query about 100tb of hbase table to do aggregation with spark sql. What is, approximately, the most suitable cluster configuration for my use case? In order to query data in a fast way. At last i have to develope an online analytical application on these data. I would like to know what kind of nodes i have to configure to achieve the goal. How many RAM, cores, disks these nodes should have?? Thanks in advance, Best regards, Pietro
Re: Too many open files when using Spark to consume messages from Kafka
Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at
multiple programs compilation by sbt.
Hi, Following the Quick Start guide: https://spark.apache.org/docs/latest/quick-start.html I could compile and run a Spark program successfully, now my question is how to compile multiple programs with sbt in a bunch. E.g, two programs as: ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp_A.scala ./src/main/scala/SimpleApp_B.scala Hopefully with sbt package, I will get two .jar files for each of the source program, then I can run them separately in Spark. I tried to create two .sbt files for each program, but found only one .jar file is created. ./simpleA.sbt name := Simple Project A version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 ./simpleB.sbt name := Simple Project B version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 Does anybody know how to do it? Cheers, Dan
Re: Too many open files when using Spark to consume messages from Kafka
Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at
Re: Too many open files when using Spark to consume messages from Kafka
Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at
Re: HOw can I merge multiple DataFrame and remove duplicated key
Its no different, you would use group by and aggregate function to do so. On 30 Apr 2015 02:15, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have multiple DataFrame objects each stored in a parquet file. The DataFrame just contains 3 columns (id, value, timeStamp). I need to union all the DataFrame objects together but for duplicated id only keep the record with the latest timestamp. How can I do that? I can do this for RDDs by sc.union() to union all the RDDs and then do a reduceByKey() to remove duplicated id by keeping only the one with latest timeStamp field. But how do I do it for DataFrame? Ningjun
Too many open files when using Spark to consume messages from Kafka
Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at
Re: Extra stage that executes before triggering computation with an action
I'm not sure, but I wonder if because you are using the Spark REPL that it may not be representing what a normal runtime execution would look like and is possibly eagerly running a partial DAG once you define an operation that would cause a shuffle. What happens if you setup your same set of commands [a-e] in a file and use the Spark REPL's `load` or `paste` command to load them all at once? From Richard I have also packaged it in a jar file (without [e], the debug string), and still see the extra stage before the other two that I would expect. Even when I remove [d], the action, I still see stage 0 being executed (and do not see stage 1 and 2). Again a shortened log of the Stage 0: INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at sortByKey, which has no missing parents INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many open files when using Spark to consume messages from Kafka
Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at
Re: Join between Streaming data vs Historical Data in spark
Have you taken a look at the join section in the streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#stream-dataset-joins On Wed, Apr 29, 2015 at 7:11 AM, Rendy Bambang Junior rendy.b.jun...@gmail.com wrote: Let say I have transaction data and visit data visit | userId | Visit source | Timestamp | | A | google ads | 1 | | A | facebook ads | 2 | transaction | userId | total price | timestamp | | A | 100 | 248384| | B | 200 | 43298739 | I want to join transaction data and visit data to do sales attribution. I want to do it realtime whenever transaction occurs (streaming). Is it scalable to do join between one data and very big historical data using join function in spark? If it is not, then how it usually be done? Visit needs to be historical, since visit can be anytime before transaction (e.g. visit is one year before transaction occurs) Rendy
Kryo serialization of classes in additional jars
Hi, Is it possible to register kryo serialization for classes contained in jars that are added with spark.jars? In my experiment it doesn't seem to work, likely because the class registration happens before the jar is shipped to the executor and added to the classloader. Here's the general idea of what I want to do: val sparkConf = new SparkConf(true) .set(spark.jars, foo.jar) .setAppName(foo) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) // register classes contained in foo.jar sparkConf.registerKryoClasses(Array( classOf[com.foo.Foo], classOf[com.foo.Bar]))
Re: Too many open files when using Spark to consume messages from Kafka
This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at
Re: Compute pairwise distance
This is my first thought, please suggest any further improvement: 1. Create a rdd of your dataset 2. Do an cross join to generate pairs 3. Apply reducebykey and compute distance. You will get a rdd with keypairs and distance Best Ayan On 30 Apr 2015 06:11, Driesprong, Fokko fo...@driesprong.frl wrote: Dear Sparkers, I am working on an algorithm which requires the pair distance between all points (eg. DBScan, LOF, etc.). Computing this for *n* points will require produce a n^2 matrix. If the distance measure is symmetrical, this can be reduced to (n^2)/2. What would be the most optimal way of computing this? The paper *Pairwise Element Computation with MapReduce https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper describes different approaches to optimize this process within a map-reduce model. Although I don't believe this is applicable to Spark. How would you guys approach this? I first thought about broadcasting the original points to all the workers, and then compute the distances across the different workers. Although this requires all the points to be distributed across all the machines. But this feels rather brute-force, what do you guys think. I don't expect full solutions, but some pointers would be great. I think a good solution can be re-used for many algorithms. Kind regards, Fokko Driesprong
Re: multiple programs compilation by sbt.
HI, Ted, I will have a look at it , thanks a lot. Cheers, Dan 2015年4月29日 下午5:00于 Ted Yu yuzhih...@gmail.com写道: Have you looked at http://www.scala-sbt.org/0.13/tutorial/Multi-Project.html ? Cheers On Wed, Apr 29, 2015 at 2:45 PM, Dan Dong dongda...@gmail.com wrote: Hi, Following the Quick Start guide: https://spark.apache.org/docs/latest/quick-start.html I could compile and run a Spark program successfully, now my question is how to compile multiple programs with sbt in a bunch. E.g, two programs as: ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp_A.scala ./src/main/scala/SimpleApp_B.scala Hopefully with sbt package, I will get two .jar files for each of the source program, then I can run them separately in Spark. I tried to create two .sbt files for each program, but found only one .jar file is created. ./simpleA.sbt name := Simple Project A version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 ./simpleB.sbt name := Simple Project B version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 Does anybody know how to do it? Cheers, Dan
Re: implicit function in SparkStreaming
I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . -- guoqing0...@yahoo.com.hk
Re: Too many open files when using Spark to consume messages from Kafka
Also cc;ing Cody. @Cody maybe there is a reason for doing connection pooling even if there is not performance difference. TD On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at
Re: Driver memory leak?
It could be related to this. https://issues.apache.org/jira/browse/SPARK-6737 This was fixed in Spark 1.3.1. On Wed, Apr 29, 2015 at 8:38 AM, Sean Owen so...@cloudera.com wrote: Not sure what you mean. It's already in CDH since 5.4 = 1.3.0 (This isn't the place to ask about CDH) I also don't think that's the problem. The process did not run out of memory. On Wed, Apr 29, 2015 at 2:08 PM, Serega Sheypak serega.shey...@gmail.com wrote: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. @Sean Will it be backported to CDH? I did't find that bug in CDH 5.4 release notes. 2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com: The memory leak could be related to this https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved in Spark 1.2.2 and 1.3.0. It also was a HashMap causing the issue. -Conor On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote: Please use user@, not dev@ This message does not appear to be from your driver. It also doesn't say you ran out of memory. It says you didn't tell YARN to let it use the memory you want. Look at the memory overhead param and please search first for related discussions. On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote: Hi, Dear developer, I am using Spark Streaming to read data from kafka, the program already run about 120 hours, but today the program failed because of driver's OOM as follow: Container [pid=49133,containerID=container_1429773909253_0050_02_01] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 3.2 GB of 50 GB virtual memory used. Killing container. I set --driver-memory to 2g, In my mind, driver is responsibility for job scheduler and job monitor(Please correct me If I'm wrong), Why it using so much memory? So I using jmap to monitor other program(already run about 48 hours): sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as follow: the java.util.HashMap$Entry and java.lang.Long object using about 600Mb memory! and I also using jmap to monitor other program(already run about 1 hours), the result as follow: the java.util.HashMap$Entry and java.lang.Long object doesn't using so many memory, But I found, as time goes by, the java.util.HashMap$Entry and java.lang.Long object will occupied more and more memory, It is driver's memory leak question? or other reason? Thanks Best Regards
Re: multiple programs compilation by sbt.
Have you looked at http://www.scala-sbt.org/0.13/tutorial/Multi-Project.html ? Cheers On Wed, Apr 29, 2015 at 2:45 PM, Dan Dong dongda...@gmail.com wrote: Hi, Following the Quick Start guide: https://spark.apache.org/docs/latest/quick-start.html I could compile and run a Spark program successfully, now my question is how to compile multiple programs with sbt in a bunch. E.g, two programs as: ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp_A.scala ./src/main/scala/SimpleApp_B.scala Hopefully with sbt package, I will get two .jar files for each of the source program, then I can run them separately in Spark. I tried to create two .sbt files for each program, but found only one .jar file is created. ./simpleA.sbt name := Simple Project A version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 ./simpleB.sbt name := Simple Project B version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.3.1 Does anybody know how to do it? Cheers, Dan
Re: How to stream all data out of a Kafka topic once, then terminate job?
Part of the issues is, when you read messages in a topic, the messages are peeked, not polled, so there'll be no when the queue is empty, as I understand it. So it would seem I'd want to do KafkaUtils.createRDD, which takes an array of OffsetRange's. Each OffsetRange is characterized by topic, partition, fromOffset, and untilOffset. In my case, I want to read all data, i.e. from all partitions and I don't know how many partitions there may be, nor do I know the 'untilOffset' values. In essence, I just want something like createRDD(new OffsetRangeAllData()); In addition, I'd ideally want the option of not peeking but polling the messages off the topics involved. But I'm not sure whether Kafka API's support it and then whether Spark does/will support that as well... On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote: I guess what you mean is not streaming. If you create a stream context at time t, you will receive data coming through starting time t++, not before time t. Looks like you want a queue. Let Kafka write to a queue, consume msgs from the queue and stop when queue is empty. On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HBase HTable constructor hangs
Can you verify whether the hbase release you're using has the following fix ? HBASE-8 non environment variable solution for IllegalAccessError Cheers On Tue, Apr 28, 2015 at 10:47 PM, Tridib Samanta tridib.sama...@live.com wrote: I turned on the TRACE and I see lot of following exception: java.lang.IllegalAccessError: com/google/protobuf/ZeroCopyLiteralByteString at org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:897) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildGetRowOrBeforeRequest(RequestConverter.java:131) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1402) at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:701) at org.apache.hadoop.hbase.client.HTable$2.call(HTable.java:699) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:120) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1054) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:1011) at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:326) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:192) Thanks Tridib -- From: d...@ocirs.com Date: Tue, 28 Apr 2015 22:24:39 -0700 Subject: Re: HBase HTable constructor hangs To: tridib.sama...@live.com In that case, something else is failing and the reason HBase looks like it hangs is that the hbase timeout or retry count is too high. Try setting the following conf and hbase will only hang for a few mins max and return a helpful error message. hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2) -- Dean Chen On Tue, Apr 28, 2015 at 10:18 PM, Tridib Samanta tridib.sama...@live.com wrote: Nope, my hbase is unsecured. -- From: d...@ocirs.com Date: Tue, 28 Apr 2015 22:09:51 -0700 Subject: Re: HBase HTable constructor hangs To: tridib.sama...@live.com Hi Tridib, Are you running this on a secure Hadoop/HBase cluster? I ran in to a similar issue where the HBase client can successfully connect in local mode and in the yarn-client driver but not on remote executors. The problem is that Spark doesn't distribute the hbase auth key, see the following Jira ticket and PR. https://issues.apache.org/jira/browse/SPARK-6918 -- Dean Chen On Tue, Apr 28, 2015 at 9:34 PM, Tridib Samanta tridib.sama...@live.com wrote: I am 100% sure how it's picking up the configuration. I copied the hbase-site.xml in hdfs/spark cluster (single machine). I also included hbase-site.xml in spark-job jar files. spark-job jar file also have yarn-site and mapred-site and core-site.xml in it. One interesting thing is, when I run the spark-job jar as standalone and execute the HBase client from a main method, it works fine. Same client unable to connect/hangs when the jar is distributed in spark. Thanks Tridib -- Date: Tue, 28 Apr 2015 21:25:41 -0700 Subject: Re: HBase HTable constructor hangs From: yuzhih...@gmail.com To: tridib.sama...@live.com CC: user@spark.apache.org How did you distribute hbase-site.xml to the nodes ? Looks like HConnectionManager couldn't find the hbase:meta server. Cheers On Tue, Apr 28, 2015 at 9:19 PM, Tridib Samanta tridib.sama...@live.com wrote: I am using Spark 1.2.0 and HBase 0.98.1-cdh5.1.0. Here is the jstack trace. Complete stack trace attached. Executor task launch worker-1 #58 daemon prio=5 os_prio=0 tid=0x7fd3d0445000 nid=0x488 waiting on condition [0x7fd4507d9000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:152) - locked 0xf8cb7258 (a org.apache.hadoop.hbase.client.RpcRetryingCaller) at org.apache.hadoop.hbase.client.HTable.getRowOrBefore(HTable.java:705) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:144) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.prefetchRegionCache(HConnectionManager.java:1102) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:1162) - locked 0xf84ac0b0 (a java.lang.Object) at
Join between Streaming data vs Historical Data in spark
Let say I have transaction data and visit data visit | userId | Visit source | Timestamp | | A | google ads | 1 | | A | facebook ads | 2 | transaction | userId | total price | timestamp | | A | 100 | 248384| | B | 200 | 43298739 | I want to join transaction data and visit data to do sales attribution. I want to do it realtime whenever transaction occurs (streaming). Is it scalable to do join between one data and very big historical data using join function in spark? If it is not, then how it usually be done? Visit needs to be historical, since visit can be anytime before transaction (e.g. visit is one year before transaction occurs) Rendy
Re: How to stream all data out of a Kafka topic once, then terminate job?
The idea of peek vs poll doesn't apply to kafka, because kafka is not a queue. There are two ways of doing what you want, either using KafkaRDD or a direct stream The Kafka rdd approach would require you to find the beginning and ending offsets for each partition. For an example of this, see getEarliestLeaderOffsets and getLatestLeaderOffsets in https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala For usage examples see the tests. That code isn't public so you'd need to either duplicate it, or build a version of spark with all of the 'private[blah]' restrictions removed. The direct stream approach would require setting the kafka parameter auto.offset.reset to smallest, in order to start at the beginning. If you haven't set any rate limiting parameters, then the first batch will contain all the messages. You can then kill the job after the first batch. It's possible you may be able to kill the job from a StreamingListener.onBatchCompleted, but I've never tried and don't know what the consequences may be. On Wed, Apr 29, 2015 at 8:52 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Part of the issues is, when you read messages in a topic, the messages are peeked, not polled, so there'll be no when the queue is empty, as I understand it. So it would seem I'd want to do KafkaUtils.createRDD, which takes an array of OffsetRange's. Each OffsetRange is characterized by topic, partition, fromOffset, and untilOffset. In my case, I want to read all data, i.e. from all partitions and I don't know how many partitions there may be, nor do I know the 'untilOffset' values. In essence, I just want something like createRDD(new OffsetRangeAllData()); In addition, I'd ideally want the option of not peeking but polling the messages off the topics involved. But I'm not sure whether Kafka API's support it and then whether Spark does/will support that as well... On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote: I guess what you mean is not streaming. If you create a stream context at time t, you will receive data coming through starting time t++, not before time t. Looks like you want a queue. Let Kafka write to a queue, consume msgs from the queue and stop when queue is empty. On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to group multiple row data ?
looks like you need this: lst = [[10001, 132, 2002, 1, 2012-11-23], [10001, 132, 2002, 1, 2012-11-24], [10031, 102, 223, 2, 2012-11-24], [10001, 132, 2002, 2, 2012-11-25], [10001, 132, 2002, 3, 2012-11-26]] base = sc.parallelize(lst,1).map(lambda x: Row(idx=x[0],num=x[1],yr=x[2],ev=x[3],dt=int(x[4].replace(-, baseDF = ssc.createDataFrame(base) print baseDF.printSchema() baseDF.registerTempTable(base) trm = ssc.sql(select a.idx,a.num,a.yr,b.ev,a.dt from base a inner join base b on a.idx=b.idx and a.num=b.num and a.yr=b.yr where a.dt=b.dt order by a.idx,a.num,a.yr,a.dt) trmRDD = trm.map(rowtoarr).reduceByKey(lambda x,y: str(x)+,+str(y)) for i in trmRDD.collect(): print i def rowtoarr(r): return (r.idx,r.num,r.yr,r.dt),r.ev ((10031, 102, 223, 20121124), 2) ((10001, 132, 2002, 20121123), 1) ((10001, 132, 2002, 20121125), '1,1,2') ((10001, 132, 2002, 20121124), '1,1') ((10001, 132, 2002, 20121126), '1,1,2,3') On Wed, Apr 29, 2015 at 10:34 PM, Manoj Awasthi awasthi.ma...@gmail.com wrote: Sorry but I didn't fully understand the grouping. This line: The group must only take the closest previous trigger. The first one hence shows alone. Can you please explain further? On Wed, Apr 29, 2015 at 4:42 PM, bipin bipin@gmail.com wrote: Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event, CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out of them such that I can infer that Customer registered event from 1to 2 and if present to 3 timewise and preserving the number of entries. For e.g. Before processing: 10001, 132, 2002, 1, 2012-11-23 10001, 132, 2002, 1, 2012-11-24 10031, 102, 223, 2, 2012-11-24 10001, 132, 2002, 2, 2012-11-25 10001, 132, 2002, 3, 2012-11-26 (total 5 rows) After processing: 10001, 132, 2002, 2012-11-23, 1 10031, 102, 223, 2012-11-24, 2 10001, 132, 2002, 2012-11-24, 1,2,3 (total 5 in last field - comma separated!) The group must only take the closest previous trigger. The first one hence shows alone. Can this be done using spark sql ? If it needs to processed in functionally in scala, how to do this. I can't wrap my head around this. Can anyone help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
RE: How to group multiple row data ?
I think you'd probably want to look at combineByKey. I'm on my phone so can't give you an example, but that's one solution i would try. You would then take the resulting RDD and go back to a DF if needed. From: bipinmailto:bipin@gmail.com Sent: 4/29/2015 4:13 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: How to group multiple row data ? Hi, I have a ddf with schema (CustomerID, SupplierID, ProductID, Event, CreatedOn), the first 3 are Long ints and event can only be 1,2,3 and CreatedOn is a timestamp. How can I make a group triplet/doublet/singlet out of them such that I can infer that Customer registered event from 1to 2 and if present to 3 timewise and preserving the number of entries. For e.g. Before processing: 10001, 132, 2002, 1, 2012-11-23 10001, 132, 2002, 1, 2012-11-24 10031, 102, 223, 2, 2012-11-24 10001, 132, 2002, 2, 2012-11-25 10001, 132, 2002, 3, 2012-11-26 (total 5 rows) After processing: 10001, 132, 2002, 2012-11-23, 1 10031, 102, 223, 2012-11-24, 2 10001, 132, 2002, 2012-11-24, 1,2,3 (total 5 in last field - comma separated!) The group must only take the closest previous trigger. The first one hence shows alone. Can this be done using spark sql ? If it needs to processed in functionally in scala, how to do this. I can't wrap my head around this. Can anyone help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-group-multiple-row-data-tp22701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: solr in spark
# disclaimer I'm an employee of Elastic (the company behind Elasticsearch) and lead of Elasticsearch Hadoop integration Some things to clarify on the Elasticsearch side: 1. Elasticsearch is a distributed, real-time search and analytics engine. Search is just one aspect of it and it can work with any type of data (whether it's text, image encoding, etc...): Github, Wikipedia, Stackoverflow are popular examples of known websites that are powered by Elasticsearch. In fact you can find plenty of use cases and information about this on the website [1]. 2. Elasticsearch is stand-alone and can be run on the same or separate machines as other services. In fact, on the _same_ machine, one can run _multiple_ Elasticsearch nodes (and thus clusters). For best performance, having dedicated hardware (as Nick suggested) works best. 3. The Elasticsearch Spark integration has been available for over a year through Map/Reduce and the native (Scala and Java) API since q3 last year. There are plenty of features available which are fully documented here [2]. Better yet, there's a talk by yours truly from Spark Summit East [3] that is fully focused on exactly this topic. 4. elasticsearch-hadoop is certified by Databricks, Cloudera, Hortonworks and MapR and supports both Spark core and Spark SQL 1.0-1.3. There are binaries for Scala 2.10 and 2.11. And for what it's worth, it provided on of the first (if not the first) implementation of DataSource API outside Databricks, which means not only using Elasticsearch in declarative fasion but also having push-down support for operators. Hopefully these materials will get you started with Spark and Elasticsearch and also clarify some of the misconceptions about Elasticsearch. Cheers, [1] https://www.elastic.co/products/elasticsearch [2] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/reference.html [3] http://spark-summit.org/east/2015/talk/using-spark-and-elasticsearch-for-real-time-data-analysis On 4/28/15 8:16 PM, Nick Pentreath wrote: Depends on your use case and search volume. Typically you'd have a dedicated ES cluster if your app is doing a lot of real time indexing and search. If it's only for spark integration then you could colocate ES and spark — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com mailto:gangele...@gmail.com wrote: Thanks for reply. Elastic search index will be within my Cluster? or I need the separate host the elastic search? On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com wrote: I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com mailto:gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the project describing it. https://github.com/LucidWorks/spark-solr. I have a requirement in which I want to index 20 millions companies name and then search as and when new data comes in. the output should be list of companies matching the query. Spark has inbuilt elastic search but for this purpose Elastic search is not a good option since this is totally text search problem? Elastic search is good for filtering and grouping. Does any body used solr inside spark? Regards jeetendra -- Costin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataframe filter based on another Dataframe
You mean after joining ? Sure, my question was more if there was any best practice preferred to joining the other dataframe for filtering. Regards, Olivier. Le mer. 29 avr. 2015 à 13:23, Olivier Girardot ssab...@gmail.com a écrit : Hi everyone, what is the most efficient way to filter a DataFrame on a column from another Dataframe's column. The best idea I had, was to join the two dataframes : val df1 : Dataframe val df2: Dataframe df1.join(df2, df1(id) === df2(id), inner) But I end up (obviously) with the id column twice. Another approach would be to filter df1 but I can't seem to get this to work using df2's column as a base Any idea ? Regards, Olivier.
Re: Re: implicit function in SparkStreaming
Could you put the implicit def in an object? That should work, as objects are never serialized. On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? *From:* Tathagata Das t...@databricks.com *Date:* 2015-04-30 07:00 *To:* guoqing0...@yahoo.com.hk *CC:* user user@spark.apache.org *Subject:* Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . -- guoqing0...@yahoo.com.hk
Re: [Spark SQL] Problems creating a table in specified schema/database
No, sorry this is not supported. Support for more than one database is lacking in several areas (though mostly works for hive tables). I'd like to fix this in Spark 1.5. On Tue, Apr 28, 2015 at 1:54 AM, James Aley james.a...@swiftkey.com wrote: Hey all, I'm trying to create tables from existing Parquet data in different schemata. The following isn't working for me: CREATE DATABASE foo; CREATE TABLE foo.bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Error: org.apache.spark.sql.AnalysisException: cannot recognize input near 'USING' 'com' '.' in table name; line 1 pos 13 (state=,code=0) I also tried USE foo; CREATE TABLE bar USING com.databricks.spark.avro OPTIONS (path '...'); -- Creates the table successfully, but in the default.* schema. This is on Spark 1.3.1, running on YARN, Hive 0.13.1. Any suggestions? Should this work? James.
RE: Sort (order by) of the big dataset
After day of debugging (actually, more), I can answer my question: The problem is that the default value 200 of spark.sql.shuffle.partitions is too small for sorting 2B rows. It was hard to realize because Spark executors just crash with various exceptions one by one. The other takeaway is that Dataframe order by and RDD.sortBy are implemented in different ways. BTW., why? Small synthetic test (copied from my blog): Create 2B rows of MyRecord within 2000 partitions, so each partition will have 1M of rows. import sqlContext.implicits._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) case class MyRecord(time: Double, id: String) val rdd = sc.parallelize(1 to 200, 200).flatMap(x = Seq.fill(1000)(MyRecord(util.Random.nextDouble, xxx))) Lets sort this RDD by time: val sorted = rdd.sortBy(x = x.time) result.count It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You can also check tasks that were completed in Spark web UI. The number of reducers was equal to the number of partitions, i.e. 2000 Lets convert the original RDD to Dataframe and sort again: val df = sqlContext.createDataFrame(rdd) df.registerTempTable(data) val result = sqlContext.sql(select * from data order by time) result.count It will run for a while and then crash. If you check tasks in the Spark Web UI, you will see that some of them were cancelled due to lost executors (ExecutorLost) due to some strange Exceptions. It is really hard to trace back which executor was first to be lost. The other follow it as in house of cards. What's the problem? The number of reducers. For the first task it is equal to the number of partitions, i.e. 2000, but for the second it switched to 200. From: Ulanov, Alexander Sent: Wednesday, April 29, 2015 1:08 PM To: user@spark.apache.org Subject: Sort (order by) of the big dataset Hi, I have a 2 billion records dataset witch schema eventId: String, time: Double, value: Double. It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 workers with 3GB memory. I keep failing to sort the mentioned dataset in Spark. I do the following: val pf = sqlContext.parquetFile(hdfs://my.net/data.parquet) pf.registerTempTable(data) val sorted = sqlContext.sql(select * from data order by time) sorted.saveAsParquetFile(hdfs://my.net/data-sorted.parquet) Spark starts to execute tasks and then errors like Exector Lost pop up in the web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), giving different types of Exceptions in explanation. My thinking is that the main problem is with GC overhead limit exception however I observe exceptions related to connection time out and shuffling write (java.lang.IllegalStateException: Shutdown in progress; org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ). What I tried: 1)Tried to order by eventId and by both order by eventId, time with the same result. 2)Looked at the shuffle parameters but the default do make sense. 3)Tried to repartition the data I am loading from Parquet: val pf3000 = pf.repartition(3000) in order to get smaller chunks of data passed to executors (originally there are 300 partitions). It did not help either. Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the original. 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not help. Could you suggest what might be the problem and what is the workaround? Just in case, I cannot have more RAM or more machines :) Best regards, Alexander
Re: Compute pairwise distance
Cross Join shuffle space might not be needed since most likely through application specific logic (topK etc) you can cut the shuffle space...Also most likely the brute force approach will be a benchmark tool to see how better is your clustering based KNN solution since there are several ways you can find approximate nearest neighbors for your application (KMeans/KDTree/LSH etc)... There is a variant that I will bring as a PR for this JIRA and we will of course look into how to improve it further...the idea is to think about distributed matrix multiply where both matrices A and B are distributed and master coordinates pulling a partition of A and multiply it with B... The idea suffices for kernel matrix generation as well if the number of rows are modest (~10M or so)... https://issues.apache.org/jira/browse/SPARK-4823 On Wed, Apr 29, 2015 at 3:25 PM, ayan guha guha.a...@gmail.com wrote: This is my first thought, please suggest any further improvement: 1. Create a rdd of your dataset 2. Do an cross join to generate pairs 3. Apply reducebykey and compute distance. You will get a rdd with keypairs and distance Best Ayan On 30 Apr 2015 06:11, Driesprong, Fokko fo...@driesprong.frl wrote: Dear Sparkers, I am working on an algorithm which requires the pair distance between all points (eg. DBScan, LOF, etc.). Computing this for *n* points will require produce a n^2 matrix. If the distance measure is symmetrical, this can be reduced to (n^2)/2. What would be the most optimal way of computing this? The paper *Pairwise Element Computation with MapReduce https://www.cs.ucsb.edu/~ravenben/classes/290F/papers/kvl10.pdf* paper describes different approaches to optimize this process within a map-reduce model. Although I don't believe this is applicable to Spark. How would you guys approach this? I first thought about broadcasting the original points to all the workers, and then compute the distances across the different workers. Although this requires all the points to be distributed across all the machines. But this feels rather brute-force, what do you guys think. I don't expect full solutions, but some pointers would be great. I think a good solution can be re-used for many algorithms. Kind regards, Fokko Driesprong
Re: Re: implicit function in SparkStreaming
Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? From: Tathagata Das Date: 2015-04-30 07:00 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . guoqing0...@yahoo.com.hk
spark kryo serialization question
Hi all We know that spark support Kryo serialization, suppose there is a map function which map C to K,V(here C,K,V are instance of class C,K,V), when we register kryo serialization, should I register all of these three class? Best Wishes Triones Deng 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作! This communication is intended only for the addressee(s) and may contain information that is privileged and confidential. You are hereby notified that, if you are not an intended recipient listed above, or an authorized employee or agent of an addressee of this communication responsible for delivering e-mail messages to an intended recipient, any dissemination, distribution or reproduction of this communication (including any attachments hereto) is strictly prohibited. If you have received this communication in error, please notify us immediately by a reply e-mail addressed to the sender and permanently delete the original e-mail communication and any attachments from all storage devices without making or otherwise retaining a copy.
Re: How to install spark in spark on yarn mode
Hi, Follow the instructions to install on the following link: http://mbonaci.github.io/mbo-spark/ You dont need to install spark on every node.Just install it on one node or you can install it on remote system also and made a spark cluster. Thanks Madhvi On Thursday 30 April 2015 09:31 AM, xiaohe lan wrote: Hi experts, I see spark on yarn has yarn-client and yarn-cluster mode. I also have a 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try the spark on yarn mode. Do I need to install spark on the each node of hadoop cluster ? Thanks, Xiaohe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: implicit function in SparkStreaming
For you question, I think the discussion in this link can help. http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-td6801.html Best regards, Lin Hao XU IBM Research China Email: xulin...@cn.ibm.com My Flickr: http://www.flickr.com/photos/xulinhao/sets From: guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk To: Tathagata Das t...@databricks.com Cc: user user@spark.apache.org Date: 2015/04/30 11:17 Subject:Re: Re: implicit function in SparkStreaming Appreciate for your help , it works . i`m curious why the enclosing class cannot serialized , is it need to extends java.io.Serializable ? if object never serialized how it works in the task .whether there`s any association with the spark.closure.serializer . guoqing0...@yahoo.com.hk From: Tathagata Das Date: 2015-04-30 09:30 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Re: implicit function in SparkStreaming Could you put the implicit def in an object? That should work, as objects are never serialized. On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? From: Tathagata Das Date: 2015-04-30 07:00 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . guoqing0...@yahoo.com.hk
Event generator for SPARK-Streaming from csv
I have the real DEBS-TAxi data in csv file , in order to operate over it how to simulate a Spout kind of thing as event generator using the timestamps in CSV file. -- SERC-IISC Thanks Regards, Anshu Shukla
RE: HOw can I merge multiple DataFrame and remove duplicated key
As I understand from SQL, group by allow you to do sum(), average(), max(), mn(). But how do I select the entire row in the group with maximum column timeStamp? For example id1, value1, 2015-01-01 id1, value2, 2015-01-02 id2, value3, 2015-01-01 id2, value4, 2015-01-02 I want to return id1, value2, 2015-01-02 id2, value4, 2015-01-02 I can use reduceByKey() in RDD but how to do it using DataFrame? Can you give an example code snipet? Thanks Ningjun From: ayan guha [mailto:guha.a...@gmail.com] Sent: Wednesday, April 29, 2015 5:54 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: HOw can I merge multiple DataFrame and remove duplicated key Its no different, you would use group by and aggregate function to do so. On 30 Apr 2015 02:15, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: I have multiple DataFrame objects each stored in a parquet file. The DataFrame just contains 3 columns (id, value, timeStamp). I need to union all the DataFrame objects together but for duplicated id only keep the record with the latest timestamp. How can I do that? I can do this for RDDs by sc.union() to union all the RDDs and then do a reduceByKey() to remove duplicated id by keeping only the one with latest timeStamp field. But how do I do it for DataFrame? Ningjun
Re: Too many open files when using Spark to consume messages from Kafka
Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html dstream.foreachRDD { rdd = val connection = createNewConnection() // executed at the driver rdd.foreach { record = connection.send(record) // executed at the worker }} @td I've specifically looked at kafka socket connections for the standard 1.3 code vs my branch that has cached connections. The standard non-caching code has very short lived connections. I've had jobs running for a month at a time, including ones writing to mysql. Not saying it's impossible, but I'd think we need some evidence before speculating this has anything to do with it. On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay bill.jaypeter...@gmail.com wrote: This function is called in foreachRDD. I think it should be running in the executors. I add the statement.close() in the code and it is running. I will let you know if this fixes the issue. On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das t...@databricks.com wrote: Is the function ingestToMysql running on the driver or on the executors? Accordingly you can try debugging while running in a distributed manner, with and without calling the function. If you dont get too many open files without calling ingestToMysql(), the problem is likely to be in ingestToMysql(). If you get the problem even without calling ingestToMysql(), then the problem may be in Kafka. If the problem is occuring in the driver, then its the DirecKafkaInputDStream code. If the problem is occurring in the executor, then the problem is in KafkaRDD. TD On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu yuzhih...@gmail.com wrote: Maybe add statement.close() in finally block ? Streaming / Kafka experts may have better insight. On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Thanks for the suggestion. I ran the command and the limit is 1024. Based on my understanding, the connector to Kafka should not open so many files. Do you think there is possible socket leakage? BTW, in every batch which is 5 seconds, I output some results to mysql: def ingestToMysql(data: Array[String]) { val url = jdbc:mysql://localhost:3306/realtime?user=rootpassword=123 var sql = insert into loggingserver1 values data.foreach(line = sql += line) sql = sql.dropRight(1) sql += ; logger.info(sql) var conn: java.sql.Connection = null try { conn = DriverManager.getConnection(url) val statement = conn.createStatement() statement.executeUpdate(sql) } catch { case e: Exception = logger.error(e.getMessage()) } finally { if (conn != null) { conn.close } } } I am not sure whether the leakage originates from Kafka connector or the sql connections. Bill On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu yuzhih...@gmail.com wrote: Can you run the command 'ulimit -n' to see the current limit ? To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf* Cheers On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am using the direct approach to receive real-time data from Kafka in the following link: https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html My code follows the word count direct example: https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala After around 12 hours, I got the following error messages in Spark log: 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time 143033869 ms org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files, java.io.IOException: Too many open files) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at
How to install spark in spark on yarn mode
Hi experts, I see spark on yarn has yarn-client and yarn-cluster mode. I also have a 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try the spark on yarn mode. Do I need to install spark on the each node of hadoop cluster ? Thanks, Xiaohe
Re: Re: implicit function in SparkStreaming
Appreciate for your help , it works . i`m curious why the enclosing class cannot serialized , is it need to extends java.io.Serializable ? if object never serialized how it works in the task .whether there`s any association with the spark.closure.serializer . guoqing0...@yahoo.com.hk From: Tathagata Das Date: 2015-04-30 09:30 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Re: implicit function in SparkStreaming Could you put the implicit def in an object? That should work, as objects are never serialized. On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? From: Tathagata Das Date: 2015-04-30 07:00 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . guoqing0...@yahoo.com.hk
Re: How to stream all data out of a Kafka topic once, then terminate job?
Thanks for the comments, Cody. Granted, Kafka topics aren't queues. I was merely wishing that Kafka's topics had some queue behaviors supported because often that is exactly what one wants. The ability to poll messages off a topic seems like what lots of use-cases would want. I'll explore both of these approaches you mentioned. For now, I see that using the KafkaRDD approach means finding partitions and offsets. My thinking was that it'd be nice if there was a convenience in the API that would wrap this logic and expose it as a method. For the second approach, I'll need to see where the listener is grafted on and whether it would have enough ability to kill the whole job. There's the stop method on the context so perhaps if the listener could grab hold of the context it'd invoke stop() on it. On Wed, Apr 29, 2015 at 10:26 AM, Cody Koeninger c...@koeninger.org wrote: The idea of peek vs poll doesn't apply to kafka, because kafka is not a queue. There are two ways of doing what you want, either using KafkaRDD or a direct stream The Kafka rdd approach would require you to find the beginning and ending offsets for each partition. For an example of this, see getEarliestLeaderOffsets and getLatestLeaderOffsets in https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala For usage examples see the tests. That code isn't public so you'd need to either duplicate it, or build a version of spark with all of the 'private[blah]' restrictions removed. The direct stream approach would require setting the kafka parameter auto.offset.reset to smallest, in order to start at the beginning. If you haven't set any rate limiting parameters, then the first batch will contain all the messages. You can then kill the job after the first batch. It's possible you may be able to kill the job from a StreamingListener.onBatchCompleted, but I've never tried and don't know what the consequences may be. On Wed, Apr 29, 2015 at 8:52 AM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Part of the issues is, when you read messages in a topic, the messages are peeked, not polled, so there'll be no when the queue is empty, as I understand it. So it would seem I'd want to do KafkaUtils.createRDD, which takes an array of OffsetRange's. Each OffsetRange is characterized by topic, partition, fromOffset, and untilOffset. In my case, I want to read all data, i.e. from all partitions and I don't know how many partitions there may be, nor do I know the 'untilOffset' values. In essence, I just want something like createRDD(new OffsetRangeAllData()); In addition, I'd ideally want the option of not peeking but polling the messages off the topics involved. But I'm not sure whether Kafka API's support it and then whether Spark does/will support that as well... On Wed, Apr 29, 2015 at 1:52 AM, ayan guha guha.a...@gmail.com wrote: I guess what you mean is not streaming. If you create a stream context at time t, you will receive data coming through starting time t++, not before time t. Looks like you want a queue. Let Kafka write to a queue, consume msgs from the queue and stop when queue is empty. On 29 Apr 2015 14:35, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I'm wondering about the use-case where you're not doing continuous, incremental streaming of data out of Kafka but rather want to publish data once with your Producer(s) and consume it once, in your Consumer, then terminate the consumer Spark job. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(...)); The batchDuration parameter is The time interval at which streaming data will be divided into batches. Can this be worked somehow to cause Spark Streaming to just get all the available data, then let all the RDD's within the Kafka discretized stream get processed, and then just be done and terminate, rather than wait another period and try and process any more data from Kafka? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-stream-all-data-out-of-a-Kafka-topic-once-then-terminate-job-tp22698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: solr in spark
On 4/29/15 6:02 PM, Jeetendra Gangele wrote: Thanks for detail explanation. My only worry is to search the all combinations of company names through ES looks hard. I'm not sure what makes you think ES looks hard. Have you tried browsing the Elasticsearch reference or the definitive guide? [1] http://www.elastic.co/guide/en/elasticsearch/reference/current/index.html [2] http://www.elastic.co/guide/en/elasticsearch/guide/current/index.html in solr we define everything in xml files like all attributes in WordDocumentFilterFactory and shingles factory. how to do this in elastic search? See the links above, the IRC or the mailing list. I don't want to derail this thread any longer so I'll wrap up by pointing to one of the many resources that pop up on google - a blog post on shingles and a post from Found.no on text analysis and shingles https://www.elastic.co/blog/searching-with-shingles https://www.found.no/foundation/text-analysis-part-1/#optimizing-phrase-searches-with-shingles If you need more help, do reach out to the Elasticsearch mailing list: https://www.elastic.co/community Cheers, On 29 April 2015 at 20:03, Costin Leau costin.l...@gmail.com mailto:costin.l...@gmail.com wrote: # disclaimer I'm an employee of Elastic (the company behind Elasticsearch) and lead of Elasticsearch Hadoop integration Some things to clarify on the Elasticsearch side: 1. Elasticsearch is a distributed, real-time search and analytics engine. Search is just one aspect of it and it can work with any type of data (whether it's text, image encoding, etc...): Github, Wikipedia, Stackoverflow are popular examples of known websites that are powered by Elasticsearch. In fact you can find plenty of use cases and information about this on the website [1]. 2. Elasticsearch is stand-alone and can be run on the same or separate machines as other services. In fact, on the _same_ machine, one can run _multiple_ Elasticsearch nodes (and thus clusters). For best performance, having dedicated hardware (as Nick suggested) works best. 3. The Elasticsearch Spark integration has been available for over a year through Map/Reduce and the native (Scala and Java) API since q3 last year. There are plenty of features available which are fully documented here [2]. Better yet, there's a talk by yours truly from Spark Summit East [3] that is fully focused on exactly this topic. 4. elasticsearch-hadoop is certified by Databricks, Cloudera, Hortonworks and MapR and supports both Spark core and Spark SQL 1.0-1.3. There are binaries for Scala 2.10 and 2.11. And for what it's worth, it provided on of the first (if not the first) implementation of DataSource API outside Databricks, which means not only using Elasticsearch in declarative fasion but also having push-down support for operators. Hopefully these materials will get you started with Spark and Elasticsearch and also clarify some of the misconceptions about Elasticsearch. Cheers, [1] https://www.elastic.co/products/elasticsearch [2] http://www.elastic.co/guide/en/elasticsearch/hadoop/master/reference.html [3] http://spark-summit.org/east/2015/talk/using-spark-and-elasticsearch-for-real-time-data-analysis On 4/28/15 8:16 PM, Nick Pentreath wrote: Depends on your use case and search volume. Typically you'd have a dedicated ES cluster if your app is doing a lot of real time indexing and search. If it's only for spark integration then you could colocate ES and spark — Sent from Mailbox https://www.dropbox.com/mailbox On Tue, Apr 28, 2015 at 6:41 PM, Jeetendra Gangele gangele...@gmail.com mailto:gangele...@gmail.com mailto:gangele...@gmail.com mailto:gangele...@gmail.com wrote: Thanks for reply. Elastic search index will be within my Cluster? or I need the separate host the elastic search? On 28 April 2015 at 22:03, Nick Pentreath nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com wrote: I haven't used Solr for a long time, and haven't used Solr in Spark. However, why do you say Elasticsearch is not a good option ...? ES absolutely supports full-text search and not just filtering and grouping (in fact it's original purpose was and still is text search, though filtering, grouping and aggregation are heavily used). http://www.elastic.co/guide/en/elasticsearch/guide/master/full-text-search.html On Tue, Apr 28, 2015 at 6:27 PM, Jeetendra Gangele gangele...@gmail.com mailto:gangele...@gmail.com mailto:gangele...@gmail.com mailto:gangele...@gmail.com wrote: Does anyone tried using solr inside spark? below is the