Querying JSON in Spark SQL
Is there any documentation that explains how to query JSON documents using SparkSQL? Thanks, Fatma
Re: Using TF-IDF from MLlib
FWIW the JIRA I was thinking about is https://issues.apache.org/jira/browse/SPARK-3098 On Mon, Mar 16, 2015 at 6:10 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: I vaguely remember that JIRA and AFAIK Matei's point was that the order is not guaranteed *after* a shuffle. If you only use operations like map which preserve partitioning, ordering should be guaranteed from what I know. On Mon, Mar 16, 2015 at 6:06 PM, Sean Owen so...@cloudera.com wrote: Dang I can't seem to find the JIRA now but I am sure we had a discussion with Matei about this and the conclusion was that RDD order is not guaranteed unless a sort is involved. On Mar 17, 2015 12:14 AM, Joseph Bradley jos...@databricks.com wrote: This was brought up again in https://issues.apache.org/jira/browse/SPARK-6340 so I'll answer one item which was asked about the reliability of zipping RDDs. Basically, it should be reliable, and if it is not, then it should be reported as a bug. This general approach should work (with explicit types to make it clear): val data: RDD[LabeledPoint] = ... val labels: RDD[Double] = data.map(_.label) val features1: RDD[Vector] = data.map(_.features) val features2: RDD[Vector] = new HashingTF(numFeatures=100).transform(features1) val features3: RDD[Vector] = idfModel.transform(features2) val finalData: RDD[LabeledPoint] = labels.zip(features3).map((label, features) = LabeledPoint(label, features)) If you run into problems with zipping like this, please report them! Thanks, Joseph On Mon, Dec 29, 2014 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote: Hopefully the new pipeline API addresses this problem. We have a code example here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala -Xiangrui On Mon, Dec 29, 2014 at 5:22 AM, andy petrella andy.petre...@gmail.com wrote: Here is what I did for this case : https://github.com/andypetrella/tf-idf Le lun 29 déc. 2014 11:31, Sean Owen so...@cloudera.com a écrit : Given (label, terms) you can just transform the values to a TF vector, then TF-IDF vector, with HashingTF and IDF / IDFModel. Then you can make a LabeledPoint from (label, vector) pairs. Is that what you're looking for? On Mon, Dec 29, 2014 at 3:37 AM, Yao y...@ford.com wrote: I found the TF-IDF feature extraction and all the MLlib code that work with pure Vector RDD very difficult to work with due to the lack of ability to associate vector back to the original data. Why can't Spark MLlib support LabeledPoint? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p20876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
question regarding the dependency DAG in Spark
Hi guys, I am trying to get a better understanding of the DAG generation for a job in Spark. Ideally, what I want is to run some SQL query and extract the generated DAG by Spark. By DAG I mean the stages and dependencies among stages, and the number of tasks in every stage. Could you guys point me to the code where is that happening ? Thank you, Robert
Re: Using TF-IDF from MLlib
This was brought up again in https://issues.apache.org/jira/browse/SPARK-6340 so I'll answer one item which was asked about the reliability of zipping RDDs. Basically, it should be reliable, and if it is not, then it should be reported as a bug. This general approach should work (with explicit types to make it clear): val data: RDD[LabeledPoint] = ... val labels: RDD[Double] = data.map(_.label) val features1: RDD[Vector] = data.map(_.features) val features2: RDD[Vector] = new HashingTF(numFeatures=100).transform(features1) val features3: RDD[Vector] = idfModel.transform(features2) val finalData: RDD[LabeledPoint] = labels.zip(features3).map((label, features) = LabeledPoint(label, features)) If you run into problems with zipping like this, please report them! Thanks, Joseph On Mon, Dec 29, 2014 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote: Hopefully the new pipeline API addresses this problem. We have a code example here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala -Xiangrui On Mon, Dec 29, 2014 at 5:22 AM, andy petrella andy.petre...@gmail.com wrote: Here is what I did for this case : https://github.com/andypetrella/tf-idf Le lun 29 déc. 2014 11:31, Sean Owen so...@cloudera.com a écrit : Given (label, terms) you can just transform the values to a TF vector, then TF-IDF vector, with HashingTF and IDF / IDFModel. Then you can make a LabeledPoint from (label, vector) pairs. Is that what you're looking for? On Mon, Dec 29, 2014 at 3:37 AM, Yao y...@ford.com wrote: I found the TF-IDF feature extraction and all the MLlib code that work with pure Vector RDD very difficult to work with due to the lack of ability to associate vector back to the original data. Why can't Spark MLlib support LabeledPoint? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p20876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: order preservation with RDDs
For those still interested, I raised this issue on JIRA and received an official response: https://issues.apache.org/jira/browse/SPARK-6340 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/order-preservation-with-RDDs-tp22052p22088.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using TF-IDF from MLlib
Dang I can't seem to find the JIRA now but I am sure we had a discussion with Matei about this and the conclusion was that RDD order is not guaranteed unless a sort is involved. On Mar 17, 2015 12:14 AM, Joseph Bradley jos...@databricks.com wrote: This was brought up again in https://issues.apache.org/jira/browse/SPARK-6340 so I'll answer one item which was asked about the reliability of zipping RDDs. Basically, it should be reliable, and if it is not, then it should be reported as a bug. This general approach should work (with explicit types to make it clear): val data: RDD[LabeledPoint] = ... val labels: RDD[Double] = data.map(_.label) val features1: RDD[Vector] = data.map(_.features) val features2: RDD[Vector] = new HashingTF(numFeatures=100).transform(features1) val features3: RDD[Vector] = idfModel.transform(features2) val finalData: RDD[LabeledPoint] = labels.zip(features3).map((label, features) = LabeledPoint(label, features)) If you run into problems with zipping like this, please report them! Thanks, Joseph On Mon, Dec 29, 2014 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote: Hopefully the new pipeline API addresses this problem. We have a code example here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala -Xiangrui On Mon, Dec 29, 2014 at 5:22 AM, andy petrella andy.petre...@gmail.com wrote: Here is what I did for this case : https://github.com/andypetrella/tf-idf Le lun 29 déc. 2014 11:31, Sean Owen so...@cloudera.com a écrit : Given (label, terms) you can just transform the values to a TF vector, then TF-IDF vector, with HashingTF and IDF / IDFModel. Then you can make a LabeledPoint from (label, vector) pairs. Is that what you're looking for? On Mon, Dec 29, 2014 at 3:37 AM, Yao y...@ford.com wrote: I found the TF-IDF feature extraction and all the MLlib code that work with pure Vector RDD very difficult to work with due to the lack of ability to associate vector back to the original data. Why can't Spark MLlib support LabeledPoint? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p20876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
version conflict common-net
Hi Folks, I have a situation where I am getting a version conflict between java libraries that is used by my application and ones used by spark. Following are the details - I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net. In our spark application we use commons-net with version 3.3. However I found out that spark uses commons-net version 2.2. Hence when we try to submit our application using spark-submit, I end up getting, a NoSuchMethodError() Error starting receiver 5 - java.lang.NoSuchMethodError: org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V at ZipStream.onStart(ZipStream.java:55) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269) . Now, if I change the commons-net version to 2.2, the job runs fine (expect for the fact that some of the features we use from the commons-net 3.3 are not there). How does one resolve such an issue where sparks uses one set of libraries and our user application requires the same set of libraries, but just a different version of it (In my case commons-net 2.2 vs 3.3). I see that there is a setting that I can supply - spark.files.userClassPathFirst, but the documentation says that it is experimental and for us this did not work at all. Thanks in advance. Regards, -Jacob
Spark from S3 very slow
I am seeing extremely slow performance from Spark 1.2.1 (MAPR4) on Hadoop 2.5.1 (YARN) on hive external tables on s3n. I am running a 'select count(*) from s3_table' query on the nodes using Hive 0.13 and Spark SQL 1.2.1. I am running a 5 node cluster on EC2 c3.2xlarge Mapr 4.0.2 M3 cluster. The table is 100M rows and 25GB stored as a Hive table on s3 in 250Mb splits (100 splits) Setup (on same cluster): Hive: 14 VCPU and 25GB Reserved Ram Spark: 40 Cores and 96GB Query: SELECT count(*) FROM table; Hive from local HDFS: *70s* Spark from local HDFS:* 40s* (i feel this is slow as well) Hive from S3n: *15m* Spark from S3n: *2.2h* As you can see the same query on Spark takes over 2 hours to complete with 5 slaves. Here are some metrics from a 1.2h run (i canceled at 50%) on 5 slaves 16Gb and 8 CPUs per node MetricMin25th percentileMedian75th percentileMaxDuration53 s4.1 min5.9 min10 min22 minScheduler Delay5 ms8 ms9 ms10 ms51 msTask Deserialization Time0 ms1 ms1 ms1 ms53 msGC Time30 ms86 ms0.1 s0.2 s0.4 sResult Serialization Time0 ms0 ms0 ms0 ms1 msGetting Result Time0 ms0 ms0 ms0 ms0 msInput3.4 MB3.8 MB3.8 MB32.0 MB32.0 MBShuffle Write51.0 B51.0 B51.0 B51.0 B51.0 B The only errors I am seeing in the Spark logs are occasional socket timeouts (reading from s3). Here is what the tasks are logging to my console on INFO 15/03/11 23:03:05 INFO rdd.HadoopRDD: Input split: s3n://bucket/warehouse/table/part-m-00017:536870912+67108864 15/03/11 23:03:05 INFO s3n.S3NativeFileSystem: Opening 's3n://bucket/warehouse/table/part-m-00017' for reading 15/03/11 23:03:05 INFO s3n.S3NativeFileSystem: Stream for key 'warehouse/table/part-m-00017' seeking to position '469762048' 15/03/11 23:03:06 INFO s3n.S3NativeFileSystem: Stream for key 'warehouse/table/part-m-00017' seeking to position '536870912' 15/03/11 22:50:09 INFO s3n.S3NativeFileSystem: Received Exception while reading 'warehouse/table/part-m-00029', will retry by attempting to reopen stream. java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) at sun.security.ssl.InputRecord.read(InputRecord.java:509) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:204) at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:182) at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:138) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:108) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:73) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem$NativeS3FsInputStream.read(S3NativeFileSystem.java:231) at java.io.BufferedInputStream.read1(BufferedInputStream.java:273) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at java.io.DataInputStream.read(DataInputStream.java:100) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:244) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:210) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:131) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618) at
Garbage stats in Random Forest leaf node?
I dumped the trees in the random forest model, and occasionally saw a leaf node with strange stats: - pred=1.00 prob=0.80 imp=-1.00 gain=-17976931348623157.00 Here impurity = -1 and gain = a giant negative number. Normally, I would get a None from Node.stats at a leaf node. Here it printed because Some(s) matches: node.stats match { case Some(s) = println( imp=%f gain=%f format(s.impurity, s.gain)) case None = println } Is it a bug? This doesn't seem happening in the model from DecisionTree, but my data sets are limited. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Garbage-stats-in-Random-Forest-leaf-node-tp22087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Suggestion for user logging
Hi, When you submit a jar to the spark cluster, it is very difficult to see the logging. Is there any way to save the logging to a file? I mean only the logging I created not the Spark log information. Thanks, David
Re: Using TF-IDF from MLlib
I vaguely remember that JIRA and AFAIK Matei's point was that the order is not guaranteed *after* a shuffle. If you only use operations like map which preserve partitioning, ordering should be guaranteed from what I know. On Mon, Mar 16, 2015 at 6:06 PM, Sean Owen so...@cloudera.com wrote: Dang I can't seem to find the JIRA now but I am sure we had a discussion with Matei about this and the conclusion was that RDD order is not guaranteed unless a sort is involved. On Mar 17, 2015 12:14 AM, Joseph Bradley jos...@databricks.com wrote: This was brought up again in https://issues.apache.org/jira/browse/SPARK-6340 so I'll answer one item which was asked about the reliability of zipping RDDs. Basically, it should be reliable, and if it is not, then it should be reported as a bug. This general approach should work (with explicit types to make it clear): val data: RDD[LabeledPoint] = ... val labels: RDD[Double] = data.map(_.label) val features1: RDD[Vector] = data.map(_.features) val features2: RDD[Vector] = new HashingTF(numFeatures=100).transform(features1) val features3: RDD[Vector] = idfModel.transform(features2) val finalData: RDD[LabeledPoint] = labels.zip(features3).map((label, features) = LabeledPoint(label, features)) If you run into problems with zipping like this, please report them! Thanks, Joseph On Mon, Dec 29, 2014 at 4:06 PM, Xiangrui Meng men...@gmail.com wrote: Hopefully the new pipeline API addresses this problem. We have a code example here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala -Xiangrui On Mon, Dec 29, 2014 at 5:22 AM, andy petrella andy.petre...@gmail.com wrote: Here is what I did for this case : https://github.com/andypetrella/tf-idf Le lun 29 déc. 2014 11:31, Sean Owen so...@cloudera.com a écrit : Given (label, terms) you can just transform the values to a TF vector, then TF-IDF vector, with HashingTF and IDF / IDFModel. Then you can make a LabeledPoint from (label, vector) pairs. Is that what you're looking for? On Mon, Dec 29, 2014 at 3:37 AM, Yao y...@ford.com wrote: I found the TF-IDF feature extraction and all the MLlib code that work with pure Vector RDD very difficult to work with due to the lack of ability to associate vector back to the original data. Why can't Spark MLlib support LabeledPoint? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p20876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why generateJob is a private API?
It was not really meant to be pubic and overridden. Because anything you want to do to generate jobs from RDDs can be done using DStream.foreachRDD On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.
See this thread: http://search-hadoop.com/m/JW1q5Kk8Zs1 You can find Spark built against multiple hadoop releases in: http://people.apache.org/~pwendell/spark-1.3.0-rc3/ FYI On Mon, Mar 16, 2015 at 11:36 AM, Shuai Zheng szheng.c...@gmail.com wrote: And it is an NoSuchMethodError, not a classnofound error And default I think the spark is only compile against Hadoop 2.2? For this issue itself, I just check the latest spark (1.3.0), its version can work (because it package with a newer version of httpclient, I can see the method is there, although still don’t know the exact version), but this doesn’t really solve the whole problem, it is very unclear that what version of third party library is used by Spark even there is someway to figure it out, still a horrible decision to do that? *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Monday, March 16, 2015 1:06 PM *To:* Shuai Zheng *Cc:* user *Subject:* Re: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator. From my local maven repo: $ jar tvf ~/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar | grep SchemeRegistry 1373 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/impl/conn/SchemeRegistryFactory.class 2954 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/conn/scheme/SchemeRegistry.class 2936 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/auth/AuthSchemeRegistry.class If you run mvn dependency:tree, you would see something similar to the following: [INFO] | +- org.apache.hadoop:hadoop-client:jar:2.6.0:compile [INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.6.0:compile [INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | | +- org.apache.avro:avro:jar:1.7.6:compile [INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | | +- com.google.code.gson:gson:jar:2.2.4:compile [INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.6.0:compile [INFO] | | | | +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile Cheers On Mon, Mar 16, 2015 at 9:38 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the httpclient 4.2 (which I assume spark use?), I have already downgrade to the version 1.9.0 But even that, I still got an error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:102) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376) When I search the maillist, it looks the same issue as: https://github.com/apache/spark/pull/2535 http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi But I don’t understand the solution mention here? The issue is caused by an pre-package DefaultClientConnectionOperator in the spark all-in-one jar file which doesn’t have the that method. I have some questions here: How can we find out which exact version when spark try to pre-package everything (this really very painful). and how can we override it? I have tried: *val* conf =
Re: Process time series RDD after sortByKey
Hi Shuai, yup, that is exactly what I meant -- implement your own class MyGroupingRDD. This is definitely more detail than a lot of users will need to go, but its also not all that scary either. In this case, you want something that is *extremely* close to the existing CoalescedRDD, so start by looking at that code. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala The only thing which is complicated in CoalescedRDD is the PartitionCoalescer, but that is completely irrelevant for you, so you can ignore it. I started writing up a description of what to do but then I realized just writing the code would be easier :) Totally untested, but here you go: https://gist.github.com/squito/c2d1dd5413a60830d6f3 The only really interesting part here is getPartitions: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31 That's where you create partitions in your new RDD, which depend on multiple RDDs from the parent. Also note that compute() is very simple: you just concatenate together the iterators from each of the parent RDDs: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37 let me know how it goes! On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi Imran, I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here? You code below: val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn. Regards, Shuai *From:* Imran Rashid [mailto:iras...@cloudera.com] *Sent:* Monday, March 16, 2015 11:22 AM *To:* Shawn Zheng; user@spark.apache.org *Subject:* Re: Process time series RDD after sortByKey Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back
Re: ClassNotFoundException
Hi Ralph, It seems like https://issues.apache.org/jira/browse/SPARK-6299 issue, which is I'm working on. I submitted a PR for it, would you test it? Regards, Kevin On Tue, Mar 17, 2015 at 1:11 AM Ralph Bergmann ra...@dasralph.de wrote: Hi, I want to try the JavaSparkPi example[1] on a remote Spark server but I get a ClassNotFoundException. When I run it local it works but not remote. I added the spark-core lib as dependency. Do I need more? Any ideas? Thanks Ralph [1] ... https://github.com/apache/spark/blob/master/examples/ src/main/java/org/apache/spark/examples/JavaSparkPi.java - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3 createDataframe error with pandas df
kevindahl wrote I'm trying to create a spark data frame from a pandas data frame, but for even the most trivial of datasets I get an error along the lines of this: --- Py4JJavaError Traceback (most recent call last) ipython-input-11-7857f9a55971 in module () 3 BabyDataSet = zip(names,births) 4 df = pd.DataFrame(data = BabyDataSet, columns=['Names', 'Births']) 5 rdf = sqlCtx.createDataFrame(df) C:\spark\python\pyspark\sql\context.pyc in createDataFrame(self, data, schema, samplingRatio) 332 333 if isinstance(schema, (list, tuple)): -- 334 first = data.first() 335 if not isinstance(first, (list, tuple)): 336 raise ValueError(each row in `rdd` should be list or tuple, C:\spark\python\pyspark\rdd.pyc in first(self) 1241 ValueError: RDD is empty 1242 - 1243 rs = self.take(1) 1244 if rs: 1245 return rs[0] C:\spark\python\pyspark\rdd.pyc in take(self, num) 1223 1224 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) - 1225 res = self.context.runJob(self, takeUpToNumLeft, p, True) 1226 1227 items += res C:\spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal) 841 # SparkContext#runJob. 842 mappedRDD = rdd.mapPartitions(partitionFunc) -- 843 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 844 return list(mappedRDD._collect_iterator_through_file(it)) 845 C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 7, localhost): java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:189) at java.net.SocketInputStream.read(SocketInputStream.java:121) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:108) at org.apache.spark.api.python.PythonRDD$$anon$1. init (PythonRDD.scala:176) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at
RE: How to set Spark executor memory?
Hi Xi Shen, You could set the spark.executor.memory in the code itself . new SparkConf()..set(spark.executor.memory, 2g) Or you can try the -- spark.executor.memory 2g while submitting the jar. Regards Jishnu Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, March 16, 2015 2:06 PM To: Xi Shen Cc: user@spark.apache.org Subject: Re: How to set Spark executor memory? By default spark.executor.memory is set to 512m, I'm assuming since you are submiting the job using spark-submit and it is not able to override the value since you are running in local mode. Can you try it without using spark-submit as a standalone project? Thanks Best Regards On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen davidshe...@gmail.commailto:davidshe...@gmail.com wrote: I set it in code, not by configuration. I submit my jar file to local. I am working in my developer environment. On Mon, 16 Mar 2015 18:28 Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: How are you setting it? and how are you submitting the job? Thanks Best Regards On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen davidshe...@gmail.commailto:davidshe...@gmail.com wrote: Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: k-means hang without error/warning
Hi Sean, My system is windows 64 bit. I looked into the resource manager, Java is the only process that used about 13% CPU recourse; no disk activity related to Java; only about 6GB memory used out of 56GB in total. My system response very well. I don't think it is a system issue. Thanks, David On Mon, 16 Mar 2015 22:30 Sean Owen so...@cloudera.com wrote: I think you'd have to say more about stopped working. Is the GC thrashing? does the UI respond? is the CPU busy or not? On Mon, Mar 16, 2015 at 4:25 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I am running k-means using Spark in local mode. My data set is about 30k records, and I set the k = 1000. The algorithm starts and finished 13 jobs according to the UI monitor, then it stopped working. The last log I saw was: [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned broadcast 16 There're many similar log repeated, but it seems it always stop at the 16th. If I try to low down the k value, the algorithm will terminated. So I just want to know what's wrong with k=1000. Thanks, David
Re: Can I start multiple executors in local mode?
Hi David, You can try the local-cluster. the number in local-cluster[2,2,1024] represents that there are 2 worker, 2 cores and 1024M Best Regards Peng Xu 2015-03-16 19:46 GMT+08:00 Xi Shen davidshe...@gmail.com: Hi, In YARN mode you can specify the number of executors. I wonder if we can also start multiple executors at local, just to make the test run faster. Thanks, David
Re: unable to access spark @ spark://debian:7077
I can access the manage webpage at port 8080 from my mac and it told me that master and 1 slave is running and I can access them at port 7077 But the port scanner shows that port 8080 is open but not port 7077. I started the port scanner on the same machine where Spark is running. Ralph Am 16.03.15 um 13:51 schrieb Sean Owen: Are you sure the master / slaves started? Do you have network connectivity between the two? Do you have multiple interfaces maybe? Does debian resolve correctly and as you expect to the right host/interface? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
insert hive partitioned table
Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable(speeddata) hiveContext.sql(INSERT OVERWRITE table speed partition (year= + YEAR + ,month= + MONTH + ,zone= + ZONE + ) select key, speed, direction from speeddata) First I registered a temporary table speeddata. The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Re: Iterative Algorithms with Spark Streaming
MLlib supports streaming linear models: http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression and k-means: http://spark.apache.org/docs/latest/mllib-clustering.html#k-means With an iteration parameter of 1, this amounts to mini-batch SGD where the mini-batch is the Spark Streaming batch. On Mon, Mar 16, 2015 at 2:57 PM, Alex Minnaar aminn...@verticalscope.com wrote: I wanted to ask a basic question about the types of algorithms that are possible to apply to a DStream with Spark streaming. With Spark it is possible to perform iterative computations on RDDs like in the gradient descent example val points = spark.textFile(...).map(parsePoint).cache() var w = Vector.random(D) // current separating plane for (i - 1 to ITERATIONS) { val gradient = points.map(p = (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } which has a global state w that is updated after each iteration and the updated value is then used in the next iteration. My question is whether this type of algorithm is possible if the points variable was a DStream instead of an RDD? It seems like you could perform the same map as above which would create a gradient DStream and also use updateStateByKey to create a DStream for the w variable. But the problem is that there doesn't seem to be a way to reuse the w DStream inside the map. I don't think that it is possible for DStreams to communicate this way. Am I correct that this is not possible with DStreams or am I missing something? Note: The reason I ask this question is that many machine learning algorithms are trained by stochastic gradient descent. sgd is similar to the above gradient descent algorithm except each iteration is on a new minibatch of data points rather than the same data points for every iteration. It seems like Spark streaming provides a natural way to stream in these minibatches (as RDDs) but if it is not able to keep track of an updating global state variable then I don't think it Spark streaming can be used for sgd. Thanks, Alex
Re: insert hive partitioned table
Not quite sure whether I understand your question properly. But if you just want to read the partition columns, it’s pretty easy. Take the “year” column as an example, you may do this in HiveQL: |hiveContext.sql(SELECT year FROM speed) | or in DataFrame DSL: |hiveContext.table(speed).select(year) | Cheng On 3/16/15 9:59 PM, patcharee wrote: Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable(speeddata) hiveContext.sql(INSERT OVERWRITE table speed partition (year= + YEAR + ,month= + MONTH + ,zone= + ZONE + ) select key, speed, direction from speeddata) First I registered a temporary table speeddata. The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Re: insert hive partitioned table
I would like to insert the table, and the value of the partition column to be inserted must be from temporary registered table/dataframe. Patcharee On 16. mars 2015 15:26, Cheng Lian wrote: Not quite sure whether I understand your question properly. But if you just want to read the partition columns, it’s pretty easy. Take the “year” column as an example, you may do this in HiveQL: |hiveContext.sql(SELECT year FROM speed) | or in DataFrame DSL: |hiveContext.table(speed).select(year) | Cheng On 3/16/15 9:59 PM, patcharee wrote: Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable(speeddata) hiveContext.sql(INSERT OVERWRITE table speed partition (year= + YEAR + ,month= + MONTH + ,zone= + ZONE + ) select key, speed, direction from speeddata) First I registered a temporary table speeddata. The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Re: MappedStream vs Transform API
Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: Processing of text file in large gzip archive
1. I don't think textFile is capable of unpacking a .gz file. You need to use hadoopFile or newAPIHadoop file for this. Sorry that’s incorrect, textFile works fine on .gz files. What it can’t do is compute splits on gz files, so if you have a single file, you'll have a single partition. Processing 30 GB of gzipped data should not take that long, at least with the Scala API. Python not sure, especially under 1.2.1.
Re: Does spark-1.3.0 support the analytic functions defined in Hive, such as row_number, rank
You can track the issue here. https://issues.apache.org/jira/browse/SPARK-1442 Its currently not supported, i guess the test cases are work in progress. On Mon, Mar 16, 2015 at 12:44 PM, hseagle hsxup...@gmail.com wrote: Hi all, I'm wondering whether the latest spark-1.3.0 supports the windowing and analytic funtions in hive, such as row_number, rank and etc. Indeed, I've done some testing by using spark-shell and found that row_number is not supported yet. But I still found that there were some test case related to row_number and other analytics functions. These test cases is defined in sql/hive/target/scala-2.10/test-classes/ql/src/test/queries/clientpositive/windowing_multipartitioning.q So my question is divided into two parts, One is whether the analytics function is supported or not, the othere one is that if it's not supported why there are still some test cases hseagle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-1-3-0-support-the-analytic-functions-defined-in-Hive-such-as-row-number-rank-tp22072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Parquet and repartition
Hi all. When I specify the number of partitions and save this RDD in parquet format, my app fail. For example selectTest.coalesce(28).saveAsParquetFile(hdfs://vm-clusterOutput) However, it works well if I store data in text selectTest.coalesce(28).saveAsTextFile(hdfs://vm-clusterOutput) My spark version is 1.2.1 Is this bug registered? -- Saludos. Miguel Ángel
Error when using multiple python files spark-submit
I have a spark app which is composed of multiple files. When I launch Spark using: ../hadoop/spark-install/bin/spark-submit main.py --py-files /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py --master spark://spark-m:7077 I am getting an error: 15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0 (TID 5817) on executor spark-w-3.c.databerries.internal: org.apache.spark.api.python.PythonException (Traceback (most recent call last): File /home/hadoop/spark-install/python/pyspark/worker.py, line 90, in main command = pickleSer._read_with_length(infile) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 151, in _read_with_length return self.loads(obj) File /home/hadoop/spark-install/python/pyspark/serializers.py, line 396, in loads return cPickle.loads(obj) ImportError: No module named naive It is weird because I do not serialize anything. naive.py is also available on every machine at the same path. Any insight on what could be going on? The issue does not happen on my laptop. PS : I am using Spark 1.2.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can I start multiple executors in local mode?
Hi, In YARN mode you can specify the number of executors. I wonder if we can also start multiple executors at local, just to make the test run faster. Thanks, David
Re: Handling fatal errors of executors and decommission datanodes
There are 2 cases for No space left on device: 1. Some tasks which use large temp space cannot run in any node. 2. The free space of datanodes is not balance. Some tasks which use large temp space can not run in several nodes, but they can run in other nodes successfully. Because most of our cases are the second one, we set spark.scheduler.executorTaskBlacklistTime to 3 to solve such No space left on device errors. So if a task runs unsuccessfully in some executor, it won't be scheduled to the same executor in 30 seconds. Best Regards, Shixiong Zhu 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com: I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: jar conflict with Spark default packaging
Thanks a lot. I will give a try! On Monday, March 16, 2015, Adam Lewandowski adam.lewandow...@gmail.com wrote: Prior to 1.3.0, Spark has 'spark.files.userClassPathFirst' for non-yarn apps. For 1.3.0, use 'spark.executor.userClassPathFirst'. See https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCALrvLxdWwSByxNvcZtTVo8BsNRR_7tbPzWdUiAV8Ps8H1oAayQ%40mail.gmail.com%3E On Fri, Mar 13, 2015 at 1:04 PM, Shuai Zheng szheng.c...@gmail.com javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com'); wrote: Hi All, I am running spark to deal with AWS. And aws sdk latest version is working with httpclient 3.4+. Then but spark-assembly-*-.jar file has packaged an old httpclient version which cause me: ClassNotFoundException for org/apache/http/client/methods/HttpPatch Even when I put the right httpclient jar there, it won’t help because spark always take the class from same packaging first. I don’t know why spark only provide a big package which doesn’t allow us to customize the library loading sequence. I know I can just rebuild the spark, but this is very troublesome, and it should not be a general solution for long term (I can’t rebuild spark jar every time when have a jar conflict as spark is supposed to be a cluster). In hadoop, we have “mapreduce.job.user.classpath.first=true”. But “spark.yarn.user.classpath.first” only work for Yarn. I think I am not the one who face this issue. Anyone has a more general solution for this? Regards, Shuai
Re: How to set Spark executor memory?
There are a number of small misunderstandings here. In the first instance, the executor memory is not actually being set to 2g and the default of 512m is being used. If you are writing code to launch an app, then you are trying to duplicate what spark-submit does, and you don't use spark-submit. If you use spark-submit, your configuration happens too late. The memory you see in the UI is not total executor memory. it is memory available for caching. The default formula is actually 0.6 * 0.9 * total, not 0.6 * total. This is not a function of your machines total memory, but of the configured executor memory. if this value is 6.7GB it implies that you somehow configured the executors to use 12.4GB of memory. Double-check for typos and maybe confirm what figure you are quoting here. In the last instance -- you are looking at driver memory, not executor memory. The 1g you are trying to configure affects executors. On Mon, Mar 16, 2015 at 9:21 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Strange, even i'm having it while running in local mode. I set it as .set(spark.executor.memory, 1g) Thanks Best Regards On Mon, Mar 16, 2015 at 2:43 PM, Xi Shen davidshe...@gmail.com wrote: I set spark.executor.memory to 2048m. If the executor storage memory is 0.6 of executor memory, it should be 2g * 0.6 = 1.2g. My machine has 56GB memory, and 0.6 of that should be 33.6G...I hate math xD On Mon, Mar 16, 2015 at 7:59 PM Akhil Das ak...@sigmoidanalytics.com wrote: How much memory are you having on your machine? I think default value is 0.6 of the spark.executor.memory as you can see from here. Thanks Best Regards On Mon, Mar 16, 2015 at 2:26 PM, Xi Shen davidshe...@gmail.com wrote: Hi Akhil, Yes, you are right. If I ran the program from IDE as a normal java program, the executor's memory is increased...but not to 2048m, it is set to 6.7GB...Looks like there's some formula to calculate this value. Thanks, David On Mon, Mar 16, 2015 at 7:36 PM Akhil Das ak...@sigmoidanalytics.com wrote: By default spark.executor.memory is set to 512m, I'm assuming since you are submiting the job using spark-submit and it is not able to override the value since you are running in local mode. Can you try it without using spark-submit as a standalone project? Thanks Best Regards On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen davidshe...@gmail.com wrote: I set it in code, not by configuration. I submit my jar file to local. I am working in my developer environment. On Mon, 16 Mar 2015 18:28 Akhil Das ak...@sigmoidanalytics.com wrote: How are you setting it? and how are you submitting the job? Thanks Best Regards On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: configure number of cached partition in memory on SparkSQL
Hi Judy, In the case of |HadoopRDD| and |NewHadoopRDD|, partition number is actually decided by the |InputFormat| used. And |spark.sql.inMemoryColumnarStorage.batchSize| is not related to partition number, it controls the in-memory columnar batch size within a single partition. Also, what do you mean by “change the number of partitions /after/ caching the table”? Are you trying to re-cache an already cached table with a different partition number? Currently, I don’t see a super intuitive pure SQL way to set the partition number in this case. Maybe you can try this (assuming table |t| has a column |s| which is expected to be sorted): |SET spark.sql.shuffle.partitions =10; CACHE TABLE cached_tAS SELECT *FROM tORDER BY s; | In this way, we introduce a shuffle by sorting a column, and zoom in/out the partition number at the same time. This might not be the best way out there, but it’s the first one that jumped into my head. Cheng On 3/5/15 3:51 AM, Judy Nash wrote: Hi, I am tuning a hive dataset on Spark SQL deployed via thrift server. How can I change the number of partitions after caching the table on thrift server? I have tried the following but still getting the same number of partitions after caching: Spark.default.parallelism spark.sql.inMemoryColumnarStorage.batchSize Thanks, Judy
Re: Handling fatal errors of executors and decommission datanodes
I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Parquet and repartition
Thanks Sean, I forgot it The ouput error is the following: java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 207) java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 208, localhost, ANY, 2878 bytes) 15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 206, localhost): java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Mon, Mar 16, 2015 at 12:19 PM, Sean Owen so...@cloudera.com wrote: You forgot to give any information about what fail means here. On Mon, Mar 16, 2015 at 11:11 AM, Masf masfwo...@gmail.com wrote: Hi all. When I specify the number of partitions and save this RDD in parquet format, my app fail. For example
Re: k-means hang without error/warning
I think you'd have to say more about stopped working. Is the GC thrashing? does the UI respond? is the CPU busy or not? On Mon, Mar 16, 2015 at 4:25 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I am running k-means using Spark in local mode. My data set is about 30k records, and I set the k = 1000. The algorithm starts and finished 13 jobs according to the UI monitor, then it stopped working. The last log I saw was: [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned broadcast 16 There're many similar log repeated, but it seems it always stop at the 16th. If I try to low down the k value, the algorithm will terminated. So I just want to know what's wrong with k=1000. Thanks, David - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unable to access spark @ spark://debian:7077
Are you sure the master / slaves started? Do you have network connectivity between the two? Do you have multiple interfaces maybe? Does debian resolve correctly and as you expect to the right host/interface? On Mon, Mar 16, 2015 at 8:14 AM, Ralph Bergmann ra...@dasralph.de wrote: Hi, I try my first steps with Spark but I have problems to access Spark running on my Linux server from my Mac. I start Spark with sbin/start-all.sh When I now open the website at port 8080 I see that all is running and I can access Spark at port 7077 but this doesn't work. I scanned the Linux machine with nmap and port 7077 isn't open. On my Mac side I get this error message: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/16 09:11:41 INFO SparkContext: Running Spark version 1.3.0 2015-03-16 09:11:41.782 java[1004:46676] Unable to load realm info from SCDynamicStore 15/03/16 09:11:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/16 09:11:42 INFO SecurityManager: Changing view acls to: dasralph 15/03/16 09:11:42 INFO SecurityManager: Changing modify acls to: dasralph 15/03/16 09:11:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dasralph); users with modify permissions: Set(dasralph) 15/03/16 09:11:43 INFO Slf4jLogger: Slf4jLogger started 15/03/16 09:11:43 INFO Remoting: Starting remoting 15/03/16 09:11:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@imac_wlan.lan:52886] 15/03/16 09:11:43 INFO Utils: Successfully started service 'sparkDriver' on port 52886. 15/03/16 09:11:43 INFO SparkEnv: Registering MapOutputTracker 15/03/16 09:11:43 INFO SparkEnv: Registering BlockManagerMaster 15/03/16 09:11:43 INFO DiskBlockManager: Created local directory at /var/folders/h3/r2qtlmbn1cd6ctj_rcyyq_24gn/T/spark-9cce9d78-a0e6-4fb5-8cf6-00d91c764927/blockmgr-bd444818-a50a-4ea0-9cf6-3b2545f32238 15/03/16 09:11:43 INFO MemoryStore: MemoryStore started with capacity 1966.1 MB 15/03/16 09:11:43 INFO HttpFileServer: HTTP File server directory is /var/folders/h3/r2qtlmbn1cd6ctj_rcyyq_24gn/T/spark-dd67dc02-c0b7-4167-b8d5-29f057cfb253/httpd-8534edfe-46b8-49ea-9273-3e8e47947332 15/03/16 09:11:43 INFO HttpServer: Starting HTTP Server 15/03/16 09:11:44 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/16 09:11:44 INFO AbstractConnector: Started SocketConnector@0.0.0.0:52913 15/03/16 09:11:44 INFO Utils: Successfully started service 'HTTP file server' on port 52913. 15/03/16 09:11:44 INFO SparkEnv: Registering OutputCommitCoordinator 15/03/16 09:11:44 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/16 09:11:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/16 09:11:44 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/16 09:11:44 INFO SparkUI: Started SparkUI at http://imac_wlan.lan:4040 15/03/16 09:11:44 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@debian:7077/user/Master... 15/03/16 09:11:45 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@debian:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/16 09:12:04 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@debian:7077/user/Master... 15/03/16 09:12:04 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@debian:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/16 09:12:24 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@debian:7077/user/Master... 15/03/16 09:12:24 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@debian:7077] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/03/16 09:12:44 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/03/16 09:12:44 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. 15/03/16 09:12:44 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/03/16 09:12:45 INFO NettyBlockTransferService: Server created on 53666 15/03/16 09:12:45 INFO BlockManagerMaster: Trying to register BlockManager 15/03/16 09:12:45 INFO BlockManagerMasterActor: Registering block manager imac_wlan.lan:53666 with 1966.1 MB RAM, BlockManagerId(driver, imac_wlan.lan, 53666) 15/03/16 09:12:45 INFO BlockManagerMaster: Registered BlockManager 15/03/16 09:12:45 ERROR MetricsSystem: Sink class org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized What's going wrong? Ralph -- Ralph Bergmann www http://www.dasralph.de | http://www.the4thFloor.eu mail ra...@dasralph.de skype
Iterative Algorithms with Spark Streaming
I wanted to ask a basic question about the types of algorithms that are possible to apply to a DStream with Spark streaming. With Spark it is possible to perform iterative computations on RDDs like in the gradient descent example val points = spark.textFile(...).map(parsePoint).cache() var w = Vector.random(D) // current separating plane for (i - 1 to ITERATIONS) { val gradient = points.map(p = (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } which has a global state w that is updated after each iteration and the updated value is then used in the next iteration. My question is whether this type of algorithm is possible if the points variable was a DStream instead of an RDD? It seems like you could perform the same map as above which would create a gradient DStream and also use updateStateByKey to create a DStream for the w variable. But the problem is that there doesn't seem to be a way to reuse the w DStream inside the map. I don't think that it is possible for DStreams to communicate this way. Am I correct that this is not possible with DStreams or am I missing something? Note: The reason I ask this question is that many machine learning algorithms are trained by stochastic gradient descent. sgd is similar to the above gradient descent algorithm except each iteration is on a new minibatch of data points rather than the same data points for every iteration. It seems like Spark streaming provides a natural way to stream in these minibatches (as RDDs) but if it is not able to keep track of an updating global state variable then I don't think it Spark streaming can be used for sgd. Thanks, Alex
Re: How to preserve/preset partition information when load time series data?
Hi Shuai, It should certainly be possible to do it that way, but I would recommend against it. If you look at HadoopRDD, its doing all sorts of little book-keeping that you would most likely want to mimic. eg., tracking the number of bytes records that are read, setting up all the hadoop configuration, splits, readers, scheduling tasks for locality, etc. Thats why I suggested that really you want to just create a small variant of HadoopRDD. hope that helps, Imran On Sat, Mar 14, 2015 at 11:10 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry for reply late. But I just think of one solution: if I load all the file name itself (not the contain of the file), so I have a RDD[key, iterable[filename]], then I run mapPartitionsToPair on it with preservesPartitioning=true Do you think it is a right solution? I am not sure whether it has potential issue if I try to fake/enforce the partition in my own way. Regards, Shuai On Wed, Mar 11, 2015 at 8:09 PM, Imran Rashid iras...@cloudera.com wrote: It should be *possible* to do what you want ... but if I understand you right, there isn't really any very easy way to do it. I think you would need to write your own subclass of RDD, which has its own logic on how the input files get put divided among partitions. You can probably subclass HadoopRDD and just modify getPartitions(). your logic could look at the day of each filename to decide which partition it goes into. You'd need to make corresponding changes for HadoopPartition the compute() method. (or if you can't subclass HadoopRDD directly you can use it for inspiration.) On Mon, Mar 9, 2015 at 11:18 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, If I have a set of time series data files, they are in parquet format and the data for each day are store in naming convention, but I will not know how many files for one day. 20150101a.parq 20150101b.parq 20150102a.parq 20150102b.parq 20150102c.parq … 201501010a.parq … Now I try to write a program to process the data. And I want to make sure each day’s data into one partition, of course I can load all into one big RDD to do partition but it will be very slow. As I already know the time series of the file name, is it possible for me to load the data into the RDD also preserve the partition? I know I can preserve the partition by each file, but is it anyway for me to load the RDD and preserve partition based on a set of files: one partition multiple files? I think it is possible because when I load a RDD from 100 files (assume cross 100 days), I will have 100 partitions (if I disable file split when load file). Then I can use a special coalesce to repartition the RDD? But I don’t know is it possible to do that in current Spark now? Regards, Shuai
RDD to DataFrame for using ALS under org.apache.spark.ml.recommendation.ALS
Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = ALS.train(ratings, rank, numIterations, 0.01) Now, for the new ALS fit method, I am trying to use the below code to run, but getting a compilation error: val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = als.fit(ratings.toDF()) I get an error that the method toDF() is not a member of org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]]. Appreciate the help ! Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem connecting to HBase
Hello Ted, Yes, I can understand what you are suggesting. But I am unable to decipher where I am going wrong, could you please point out what are the locations to be looked at to be able to find and correct the mistake? I greatly appreciate your help! On Sun, Mar 15, 2015 at 1:10 PM, Ted Yu yuzhih...@gmail.com wrote: org.apache.hbase % hbase % 0.98.9-hadoop2 % provided, There is no module in hbase 0.98.9 called hbase. But this would not be the root cause of the error. Most likely hbase-site.xml was not picked up. Meaning this is classpath issue. On Sun, Mar 15, 2015 at 10:04 AM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello all, Thank you for your responses. I did try to include the zookeeper.znode.parent property in the hbase-site.xml. It still continues to give the same error. I am using Spark 1.2.0 and hbase 0.98.9. Could you please suggest what else could be done? On Fri, Mar 13, 2015 at 10:25 PM, Ted Yu yuzhih...@gmail.com wrote: In HBaseTest.scala: val conf = HBaseConfiguration.create() You can add some log (for zookeeper.znode.parent, e.g.) to see if the values from hbase-site.xml are picked up correctly. Please use pastebin next time you want to post errors. Which Spark release are you using ? I assume it contains SPARK-1297 Cheers On Fri, Mar 13, 2015 at 7:47 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Hello, I am running a HBase test case. I am using the example from the following: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala I created a very small HBase table with 5 rows and 2 columns. I have attached a screenshot of the error log. I believe it is a problem where the driver program is unable to establish connection to the hbase. The following is my simple.sbt: name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.2.0, org.apache.hbase % hbase % 0.98.9-hadoop2 % provided, org.apache.hbase % hbase-client % 0.98.9-hadoop2 % provided, org.apache.hbase % hbase-server % 0.98.9-hadoop2 % provided, org.apache.hbase % hbase-common % 0.98.9-hadoop2 % provided ) I am using a 23 node cluster, did copy hbase-site.xml into /spark/conf folder and set spark.executor.extraClassPath pointing to the /hbase/ folder in the spark-defaults.conf Also, while submitting the spark job I am including the required jars : spark-submit --class HBaseTest --master yarn-cluster --driver-class-path /opt/hbase/0.98.9/lib/hbase-server-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-protocol-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-hadoop2-compat-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-client-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/hbase-common-0.98.9-hadoop2.jar:/opt/hbase/0.98.9/lib/htrace-core-2.04.jar /home/priya/usingHBase/Spark/target/scala-2.10/simple-project_2.10-1.0.jar /Priya/sparkhbase-test1 It would be great if you could point where I am going wrong, and what could be done to correct it. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112 -- Regards, Haripriya Ayyalasomayajula Graduate Student Department of Computer Science University of Houston Contact : 650-796-7112
ClassNotFoundException
Hi, I want to try the JavaSparkPi example[1] on a remote Spark server but I get a ClassNotFoundException. When I run it local it works but not remote. I added the spark-core lib as dependency. Do I need more? Any ideas? Thanks Ralph [1] ... https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/16 17:02:45 INFO CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 2015-03-16 17:02:45.624 java[5730:1133038] Unable to load realm info from SCDynamicStore 15/03/16 17:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/16 17:02:45 INFO SecurityManager: Changing view acls to: dasralph 15/03/16 17:02:45 INFO SecurityManager: Changing modify acls to: dasralph 15/03/16 17:02:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dasralph); users with modify permissions: Set(dasralph) 15/03/16 17:02:46 INFO Slf4jLogger: Slf4jLogger started 15/03/16 17:02:46 INFO Remoting: Starting remoting 15/03/16 17:02:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@10.0.0.10:54973] 15/03/16 17:02:46 INFO Utils: Successfully started service 'driverPropsFetcher' on port 54973. 15/03/16 17:02:46 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/03/16 17:02:46 INFO SecurityManager: Changing view acls to: dasralph 15/03/16 17:02:46 INFO SecurityManager: Changing modify acls to: dasralph 15/03/16 17:02:46 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/03/16 17:02:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(dasralph); users with modify permissions: Set(dasralph) 15/03/16 17:02:46 INFO Slf4jLogger: Slf4jLogger started 15/03/16 17:02:46 INFO Remoting: Starting remoting 15/03/16 17:02:46 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/03/16 17:02:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@10.0.0.10:54977] 15/03/16 17:02:46 INFO Utils: Successfully started service 'sparkExecutor' on port 54977. 15/03/16 17:02:46 INFO AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkDriver@10.0.0.10:54945/user/MapOutputTracker 15/03/16 17:02:46 INFO AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkDriver@10.0.0.10:54945/user/BlockManagerMaster 15/03/16 17:02:46 INFO DiskBlockManager: Created local directory at /var/folders/5p/s1k2jrqx38ncxkm4wlflgfvwgn/T/spark-185c8652-0244-42ff-90b4-fd9c7dbde7b3/spark-8a9ba955-de6f-4ab1-8995-1474ac1ba3a9/spark-2533211f-c399-467f-851d-6b9ec89defdc/blockmgr-82e98f66-d592-42a7-a4fa-1ab78780814b 15/03/16 17:02:46 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/16 17:02:46 INFO AkkaUtils: Connecting to OutputCommitCoordinator: akka.tcp://sparkDriver@10.0.0.10:54945/user/OutputCommitCoordinator 15/03/16 17:02:46 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkDriver@10.0.0.10:54945/user/CoarseGrainedScheduler 15/03/16 17:02:46 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.0.0.10:58715/user/Worker 15/03/16 17:02:46 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.0.0.10:58715/user/Worker 15/03/16 17:02:46 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 15/03/16 17:02:46 INFO Executor: Starting executor ID 0 on host 10.0.0.10 15/03/16 17:02:47 INFO NettyBlockTransferService: Server created on 54983 15/03/16 17:02:47 INFO BlockManagerMaster: Trying to register BlockManager 15/03/16 17:02:47 INFO BlockManagerMaster: Registered BlockManager 15/03/16 17:02:47 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.0.0.10:54945/user/HeartbeatReceiver 15/03/16 17:02:47 INFO CoarseGrainedExecutorBackend: Got assigned task 0 15/03/16 17:02:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/03/16 17:02:47 INFO CoarseGrainedExecutorBackend: Got assigned task 1 15/03/16 17:02:47 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 15/03/16 17:02:47 INFO TorrentBroadcast: Started reading broadcast variable 0 15/03/16 17:02:47 INFO MemoryStore: ensureFreeSpace(1679) called with curMem=0, maxMem=278302556 15/03/16 17:02:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1679.0 B, free 265.4 MB) 15/03/16 17:02:47 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/16 17:02:47 INFO TorrentBroadcast: Reading broadcast variable 0 took 188 ms 15/03/16 17:02:47 INFO MemoryStore: ensureFreeSpace(2312) called with curMem=1679, maxMem=278302556 15/03/16 17:02:47 INFO MemoryStore: Block broadcast_0 stored as values
Re: Parquet and repartition
Hey Masf, I’ve created SPARK-6360 https://issues.apache.org/jira/browse/SPARK-6360 to track this issue. Detailed analysis is provided there. The TL;DR is, for Spark 1.1 and 1.2, if a SchemaRDD contains decimal or UDT column(s), after applying any traditional RDD transformations (e.g. repartition, coalesce, distinct, …), calling saveAsParquetFile may trigger this issue. Fortunately, Spark 1.3 isn’t affected as we replaced SchemaRDD with DataFrame, which properly handles this case. Cheng On 3/16/15 7:30 PM, Masf wrote: Thanks Sean, I forgot it The ouput error is the following: java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 207) java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 208, localhost, ANY, 2878 bytes) 15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 206, localhost): java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at
[SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.
Hi All, I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the httpclient 4.2 (which I assume spark use?), I have already downgrade to the version 1.9.0 But even that, I still got an error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache /http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOpe rator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien tConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClien tConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(C onnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java :102) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376) When I search the maillist, it looks the same issue as: https://github.com/apache/spark/pull/2535 http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running- aws-s3-client-on-spark-while-javap-shows-otherwi But I don't understand the solution mention here? The issue is caused by an pre-package DefaultClientConnectionOperator in the spark all-in-one jar file which doesn't have the that method. I have some questions here: How can we find out which exact version when spark try to pre-package everything (this really very painful). and how can we override it? I have tried: val conf = new SparkConf() .set(spark.files.userClassPathFirst, true)// For non Yarn APP before spark 1.3 .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0 But it doesn't work This really create a lot of issues to me (especially we don't know what version is used by Spark to package its own jar, we need to try out). Even maven doesn't give enough information because httpclient is not under the maven dependency (even indirect dependency, after I use tools to resolved the whole dependency tree). Regards, Shuai
Re: unable to access spark @ spark://debian:7077
Okay I think I found the mistake The Eclipse Maven plug suggested me version 1.2.1 of the spark-core lib but I use Spark 1.3.0 As I fixed it I can access the Spark server. Ralph Am 16.03.15 um 14:39 schrieb Ralph Bergmann: I can access the manage webpage at port 8080 from my mac and it told me that master and 1 slave is running and I can access them at port 7077 But the port scanner shows that port 8080 is open but not port 7077. I started the port scanner on the same machine where Spark is running. Ralph - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Process time series RDD after sortByKey
Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back to the driver, you could use RDD.toLocalIterator. That will send one partition back to the driver, let you process it on the driver, then fetch the next partition, etc. Imran
Priority queue in spark
Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Handling fatal errors of executors and decommission datanodes
Thanks Shixiong! Very strange that our tasks were retried on the same executor again and again. I'll check spark.scheduler.executorTaskBlacklistTime. Jianshi On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: There are 2 cases for No space left on device: 1. Some tasks which use large temp space cannot run in any node. 2. The free space of datanodes is not balance. Some tasks which use large temp space can not run in several nodes, but they can run in other nodes successfully. Because most of our cases are the second one, we set spark.scheduler.executorTaskBlacklistTime to 3 to solve such No space left on device errors. So if a task runs unsuccessfully in some executor, it won't be scheduled to the same executor in 30 seconds. Best Regards, Shixiong Zhu 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com: I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: insert hive partitioned table
I see. Since all Spark SQL queries must be issued from the driver side, you'll have to first collect all interested values to the driver side, and then use them to compose one or more insert statements. Cheng On 3/16/15 10:33 PM, patcharee wrote: I would like to insert the table, and the value of the partition column to be inserted must be from temporary registered table/dataframe. Patcharee On 16. mars 2015 15:26, Cheng Lian wrote: Not quite sure whether I understand your question properly. But if you just want to read the partition columns, it’s pretty easy. Take the “year” column as an example, you may do this in HiveQL: |hiveContext.sql(SELECT year FROM speed) | or in DataFrame DSL: |hiveContext.table(speed).select(year) | Cheng On 3/16/15 9:59 PM, patcharee wrote: Hi, I tried to insert into a hive partitioned table val ZONE: Int = Integer.valueOf(args(2)) val MONTH: Int = Integer.valueOf(args(3)) val YEAR: Int = Integer.valueOf(args(4)) val weightedUVToDF = weightedUVToRecord.toDF() weightedUVToDF.registerTempTable(speeddata) hiveContext.sql(INSERT OVERWRITE table speed partition (year= + YEAR + ,month= + MONTH + ,zone= + ZONE + ) select key, speed, direction from speeddata) First I registered a temporary table speeddata. The value of the partitioned column (year, month, zone) is from user input. If I would like to get the value of the partitioned column from the temporary table, how can I do that? BR, Patcharee
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Processing of text file in large gzip archive
You probably want to update this line as follows: lines = sc.textFile('file.gz').repartition(sc.defaultParallelism * 3) For more details on why, see this answer http://stackoverflow.com/a/27631722/877069. Nick On Mon, Mar 16, 2015 at 6:50 AM Marius Soutier mps@gmail.com wrote: 1. I don't think textFile is capable of unpacking a .gz file. You need to use hadoopFile or newAPIHadoop file for this. Sorry that’s incorrect, textFile works fine on .gz files. What it can’t do is compute splits on gz files, so if you have a single file, you'll have a single partition. Processing 30 GB of gzipped data should not take that long, at least with the Scala API. Python not sure, especially under 1.2.1.
Re: Handling fatal errors of executors and decommission datanodes
Oh, by default it's set to 0L. I'll try setting it to 3 immediately. Thanks for the help! Jianshi On Mon, Mar 16, 2015 at 11:32 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Thanks Shixiong! Very strange that our tasks were retried on the same executor again and again. I'll check spark.scheduler.executorTaskBlacklistTime. Jianshi On Mon, Mar 16, 2015 at 6:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: There are 2 cases for No space left on device: 1. Some tasks which use large temp space cannot run in any node. 2. The free space of datanodes is not balance. Some tasks which use large temp space can not run in several nodes, but they can run in other nodes successfully. Because most of our cases are the second one, we set spark.scheduler.executorTaskBlacklistTime to 3 to solve such No space left on device errors. So if a task runs unsuccessfully in some executor, it won't be scheduled to the same executor in 30 seconds. Best Regards, Shixiong Zhu 2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com: I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, We're facing No space left on device errors lately from time to time. The job will fail after retries. Obvious in such case, retry won't be helpful. Sure it's the problem in the datanodes but I'm wondering if Spark Driver can handle it and decommission the problematic datanode before retrying it. And maybe dynamically allocate another datanode if dynamic allocation is enabled. I think there needs to be a class of fatal errors that can't be recovered with retries. And it's best Spark can handle it nicely. Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Scaling problem in RandomForest?
Try increasing the driver memory. We store trees on the driver node. If maxDepth=20 and numTrees=50, you may need a large driver memory to store all tree models. You might want to start with a smaller maxDepth and then increase it and see whether deep trees really help (vs. the cost). -Xiangrui On Wed, Mar 11, 2015 at 10:00 AM, insperatum inspera...@gmail.com wrote: Hi, the Random Forest implementation (1.2.1) is repeatably crashing when I increase the depth to 20. I generate random synthetic data (36 workers, 1,000,000 examples per worker, 30 features per example) as follows: val data = sc.parallelize(1 to 36, 36).mapPartitionsWithIndex((i, _) = { Array.tabulate(100){ _ = new LabeledPoint(Math.random(), Vectors.dense(Array.fill(30)(math.random))) }.toIterator }).cache() ...and then train on a Random Forest with 50 trees, to depth 20: val strategy = new Strategy(Regression, Variance, 20, maxMemoryInMB = 1000) RandomForest.trainRegressor(data, strategy, 50, sqrt, 1) ...and run on my EC2 cluster (36 slaves, master has 122GB of memory). After number crunching for a couple of hours, I get the following error: [sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:834) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/03/11 15:45:51 INFO scheduler.DAGScheduler: Job 92 failed: collectAsMap at DecisionTree.scala:653, took 46.062487 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-problem-in-RandomForest-tp22002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Top rows per group
https://issues.apache.org/jira/browse/SPARK-5954 is for this issue and Shuo is working on it. We will first implement topByKey for RDD and them we could add it to DataFrames. -Xiangrui On Mon, Mar 9, 2015 at 9:43 PM, Moss rhoud...@gmail.com wrote: I do have a schemaRDD where I want to group by a given field F1, but want the result to be not a single row per group but multiple rows per group where only the rows that have the N top F2 field values are kept. The issue is that the groupBy operation is an aggregation of multiple rows to a single one. Any suggestion or hint will be appreciated. Best, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Top-rows-per-group-tp21983.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MappedStream vs Transform API
It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Any IRC channel on Spark?
Hi, everyone, I'm wondering whether there is a possibility to setup an official IRC channel on freenode. I noticed that a lot of apache projects would have a such channel to let people talk directly. Best Michael
Basic GraphX deployment and usage question
Hi, I'm very new to Spark and GraphX. I downloaded and configured Spark on a cluster, which uses Hadoop 1.x. The master UI shows all workers. The example command run-example SparkPi works fine and completes successfully. I'm interested in GraphX. Although the documentation says it is built-in with Spark, I could not find any GraphX jar files under lib. I also wonder if any of the algorithms mentioned in GraphX programming guide page is pre-combiled and available for testing. My main objective is to ensure that at least one correct graph application is working with no errors using GraphX, before I start writing my own. -- Thanks, -Khaled
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Hi Akhil, Yes, I did change both versions on the project and the cluster. Any clues? Even the sample code from Spark website failed to work. Thanks, Eason On Sun, Mar 15, 2015 at 11:56 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you change both the versions? The one in your build file of your project and the spark version of your cluster? Thanks Best Regards On Sat, Mar 14, 2015 at 6:47 AM, EH eas...@gmail.com wrote: Hi all, I've been using Spark 1.1.0 for a while, and now would like to upgrade to Spark 1.1.1 or above. However, it throws the following errors: 18:05:31.522 [sparkDriver-akka.actor.default-dispatcher-3hread] ERROR TaskSchedulerImpl - Lost executor 37 on hcompute001: remote Akka client disassociated 18:05:31.530 [sparkDriver-akka.actor.default-dispatcher-3hread] WARN TaskSetManager - Lost task 0.0 in stage 1.0 (TID 0, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.567 [sparkDriver-akka.actor.default-dispatcher-2hread] ERROR TaskSchedulerImpl - Lost executor 3 on hcompute001: remote Akka client disassociated 18:05:31.568 [sparkDriver-akka.actor.default-dispatcher-2hread] WARN TaskSetManager - Lost task 1.0 in stage 1.0 (TID 1, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.988 [sparkDriver-akka.actor.default-dispatcher-23hread] ERROR TaskSchedulerImpl - Lost executor 24 on hcompute001: remote Akka client disassociated Do you know what may go wrong? I didn't change any codes, just changed the version of Spark. Thank you all, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Upgrade-from-Spark-1-1-0-to-1-1-1-Issues-tp22045.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SPARK-3638 ] java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.
From my local maven repo: $ jar tvf ~/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar | grep SchemeRegistry 1373 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/impl/conn/SchemeRegistryFactory.class 2954 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/conn/scheme/SchemeRegistry.class 2936 Fri Apr 19 18:19:36 PDT 2013 org/apache/http/auth/AuthSchemeRegistry.class If you run mvn dependency:tree, you would see something similar to the following: [INFO] | +- org.apache.hadoop:hadoop-client:jar:2.6.0:compile [INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.6.0:compile [INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | | +- commons-io:commons-io:jar:2.4:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-lang:commons-lang:jar:2.6:compile [INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile [INFO] | | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | | +- org.apache.avro:avro:jar:1.7.6:compile [INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | | +- com.google.code.gson:gson:jar:2.2.4:compile [INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.6.0:compile [INFO] | | | | +- org.apache.httpcomponents:httpclient:jar:4.2.5:compile Cheers On Mon, Mar 16, 2015 at 9:38 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running Spark 1.2.1 and AWS SDK. To make sure AWS compatible on the httpclient 4.2 (which I assume spark use?), I have already downgrade to the version 1.9.0 But even that, I still got an error: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V at org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114) at org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99) at com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:29) at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:102) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:190) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:119) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:410) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:392) at com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:376) When I search the maillist, it looks the same issue as: https://github.com/apache/spark/pull/2535 http://stackoverflow.com/questions/24788949/nosuchmethoderror-while-running-aws-s3-client-on-spark-while-javap-shows-otherwi But I don’t understand the solution mention here? The issue is caused by an pre-package DefaultClientConnectionOperator in the spark all-in-one jar file which doesn’t have the that method. I have some questions here: How can we find out which exact version when spark try to pre-package everything (this really very painful). and how can we override it? I have tried: *val* conf = *new* SparkConf() .set(spark.files.userClassPathFirst, true)// For non Yarn APP before spark 1.3 .set(spark.executor.userClassPathFirst, true)// For spark 1.3.0 But it doesn’t work This really create a lot of issues to me (especially we don’t know what version is used by Spark to package its own jar, we need to try out). Even maven doesn’t give enough information because httpclient is not under the maven dependency (even indirect dependency, after I use tools to resolved the whole dependency tree). Regards, Shuai
Creating a hive table on top of a parquet file written out by spark
Hi All, I wrote out a complex parquet file from spark sql and now I am trying to put a hive table on top. I am running into issues with creating the hive table itself. Here is the json that I wrote out to parquet using spark sql: {user_id:4513,providers:[{id:4220,name:dbmvl,behaviors:{b1:gxybq,b2:ntfmx}},{id:4173,name:dvjke,behaviors:{b1:sizow,b2:knuuc}}]} {user_id:3960,providers:[{id:1859,name:ponsv,behaviors:{b1:ahfgc,b2:txpea}},{id:103,name:uhqqo,behaviors:{b1:lktyo,b2:ituxy}}]} {user_id:567,providers:[{id:9622,name:crjju,behaviors:{b1:rhaqc,b2:npnot}},{id:6965,name:fnheh,behaviors:{b1:eipse,b2:nvxqk}}]} I basically created a hive context and read in the json file using jsonFile and then I wrote it back out using saveAsParquetFile. Afterwards I was trying to create a hive table on top of the parquet file. Here is the hive hql that I have: create table test (mycol STRUCTuser_id:String, providers:ARRAYlt;STRUCTlt;id:String, name:String, behaviors:MAPlt;String, String) stored as parquet; Alter table test set location 'hdfs:///tmp/test.parquet'; I get errors when I try to do a select * on the table: Failed with exception java.io.IOException:java.lang.IllegalStateException: Column mycol at index 0 does not exist in {providers=providers, user_id=user_id} -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-hive-table-on-top-of-a-parquet-file-written-out-by-spark-tp22084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with compressed xml files
textFileStream and default fileStream recognizes the compressed xml(.xml.gz) files. Each line in the xml file is an element in RDD[string]. Then whole RDD is converted to a proper xml format data and stored in a *Scala variable*. - I believe storing huge data in a *Scala variable* is inefficient. Is there any alternative processing for xml files? - How to create Spark SQL table with the above xml data? Regards Vijay Innamuri On 16 March 2015 at 12:12, Akhil Das ak...@sigmoidanalytics.com wrote: One approach would be, If you are using fileStream you can access the individual filenames from the partitions and with that filename you can apply your uncompression logic/parsing logic and get it done. Like: UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.split(); String *fPath* = npp.serializableHadoopSplit().value().toString(); Another approach would be to create a custom inputReader and InpurFormat, then pass it along with your fileStream and within the reader, you do your uncompression/parsing etc. You can also look into XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. Thanks Best Regards On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri vijay.innam...@gmail.com wrote: Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay
Re: Priority queue in spark
Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: Iterate over contents of schemaRDD loaded from parquet file to extract timestamp
I don't see non-serializable objects in the provided snippets. But you can always add -Dsun.io.serialization.extendedDebugInfo=true to Java options to debug serialization errors. Cheng On 3/17/15 12:43 PM, anu wrote: Spark Version - 1.1.0 Scala - 2.10.4 I have loaded following type data from a parquet file, stored in a schemaRDD [7654321,2015-01-01 00:00:00.007,0.49,THU] Since, in spark version 1.1.0, parquet format doesn't support saving timestamp valuues, I have saved the timestamp data as string. Can you please tell me how to iterate over the data in this schema RDD to retrieve the timestamp values and regsietr the mapped RDD as a Table and then be able to run queries like Select * from table where time = '2015-01-01 00:00:00.000' . I wrote the following code : val sdf = new SimpleDateFormat(-mm-dd hh:mm:ss.SSS); val calendar = Calendar.getInstance() val iddRDD = intf_ddRDD.map{ r = val end_time = sdf.parse(r(1).toString); calendar.setTime(end_time); val r1 = new java.sql.Timestamp(end_time.getTime); val hour: Long = calendar.get(Calendar.HOUR_OF_DAY); Row(r(0).toString.toInt, r1, hour, r(2).toString.toInt, r(3).toString) } This gives me * org.apache.spark.SparkException: Task not serializable* Please help !!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Iterate-over-contents-of-schemaRDD-loaded-from-parquet-file-to-extract-timestamp-tp22089.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Priority queue in spark
If i understand correctly , the above document creates pool for priority which is static in nature and has to be defined before submitting the job . .in my scenario each generated task can have different priority. Thanks, Abhi On Mon, Mar 16, 2015 at 9:48 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: Priority queue in spark
http://apache-spark-developers-list.1001551.n3.nabble.com/Job-priority-td10076.html#a10079 On Mon, Mar 16, 2015 at 10:26 PM, abhi abhishek...@gmail.com wrote: If i understand correctly , the above document creates pool for priority which is static in nature and has to be defined before submitting the job . .in my scenario each generated task can have different priority. Thanks, Abhi On Mon, Mar 16, 2015 at 9:48 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: Priority queue in spark
yes . Each generated job can have a different priority it is like a recursive function, where in each iteration generate job will be submitted to the spark cluster based on the priority. jobs will lower priority or less than some threshold will be discarded. Thanks, Abhi On Mon, Mar 16, 2015 at 10:36 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Abhi, You mean each task of a job can have different priority or job generated via one job can have different priority? On Tue, Mar 17, 2015 at 11:04 AM, Mark Hamstra m...@clearstorydata.com wrote: http://apache-spark-developers-list.1001551.n3.nabble.com/Job-priority-td10076.html#a10079 On Mon, Mar 16, 2015 at 10:26 PM, abhi abhishek...@gmail.com wrote: If i understand correctly , the above document creates pool for priority which is static in nature and has to be defined before submitting the job . .in my scenario each generated task can have different priority. Thanks, Abhi On Mon, Mar 16, 2015 at 9:48 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Iterate over contents of schemaRDD loaded from parquet file to extract timestamp
Spark Version - 1.1.0 Scala - 2.10.4 I have loaded following type data from a parquet file, stored in a schemaRDD [7654321,2015-01-01 00:00:00.007,0.49,THU] Since, in spark version 1.1.0, parquet format doesn't support saving timestamp valuues, I have saved the timestamp data as string. Can you please tell me how to iterate over the data in this schema RDD to retrieve the timestamp values and regsietr the mapped RDD as a Table and then be able to run queries like Select * from table where time = '2015-01-01 00:00:00.000' . I wrote the following code : val sdf = new SimpleDateFormat(-mm-dd hh:mm:ss.SSS); val calendar = Calendar.getInstance() val iddRDD = intf_ddRDD.map{ r = val end_time = sdf.parse(r(1).toString); calendar.setTime(end_time); val r1 = new java.sql.Timestamp(end_time.getTime); val hour: Long = calendar.get(Calendar.HOUR_OF_DAY); Row(r(0).toString.toInt, r1, hour, r(2).toString.toInt, r(3).toString) } This gives me * org.apache.spark.SparkException: Task not serializable* Please help !!! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Iterate-over-contents-of-schemaRDD-loaded-from-parquet-file-to-extract-timestamp-tp22089.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Saving Dstream into a single file
Each RDD has multiple partitions, each of them will produce one hdfs file when saving output. I don’t think you are allowed to have multiple file handler writing to the same hdfs file. You still can load multiple files into hive tables, right? Thanks.. Zhan Zhang On Mar 15, 2015, at 7:31 AM, tarek_abouzeid tarek.abouzei...@yahoo.com wrote: i am doing word count example on flume stream and trying to save output as text files in HDFS , but in the save directory i got multiple sub directories each having files with small size , i wonder if there is a way to append in a large file instead of saving in multiple files , as i intend to save the output in hive hdfs directory so i can query the result using hive hope anyone have a workaround for this issue , Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Dstream-into-a-single-file-tp22058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can LBFGS be used on streaming data?
Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Querying JSON in Spark SQL
The programming guide has a short example: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets. Note that once you infer a schema for a JSON dataset, you can also use nested path notation (e.g. select user.name from users) in the same way as in Hive. Matei On Mar 16, 2015, at 4:47 PM, Fatma Ozcan fatma@gmail.com wrote: Is there any documentation that explains how to query JSON documents using SparkSQL? Thanks, Fatma
RE: Process time series RDD after sortByKey
Hi Imran, I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here? You code below: val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn. Regards, Shuai From: Imran Rashid [mailto:iras...@cloudera.com] Sent: Monday, March 16, 2015 11:22 AM To: Shawn Zheng; user@spark.apache.org Subject: Re: Process time series RDD after sortByKey Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node? Not exactly. The partition is not shipped back to the driver. You create a task which will be processed by a worker. The task scheduling will take data locality into account, so ideally the task will get scheduled in the same location where the data already resides. The worker will execute someFunc, and after its done it will ship the *result* back to the driver. Then the process will get repeated for all the other partitions. If you wanted all the data sent back to the driver, you could use RDD.toLocalIterator. That will send one partition back to the driver, let you process it on the driver, then fetch the next partition, etc. Imran
Spark @ EC2: Futures timed out Ask timed out
Hi, I've been trying to run a simple SparkWordCount app on EC2, but it looks like my apps are not succeeding/completing. I'm suspecting some sort of communication issue. I used the SparkWordCount app from http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/ Digging through logs I found this: 15/03/16 21:28:20 INFO Utils: Successfully started service 'driverPropsFetcher' on port 58123. Exception in thread main java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1563) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) * Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] * at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) ... 4 more Or exceptions like: *Caused by: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://sparkDriver@ip-10-111-222-111.ec2.internal:58360/), Path(/user/CoarseGrainedScheduler)]] after [3 ms] * at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) This is in EC2 and I have ports 22, 7077, 8080, and 8081 open to any source. But maybe I need to do something, too? I do see Master sees Workers and Workers do connect to the Master. I did run this in spark-shell, and it runs without problems; scala val something = sc.parallelize(1 to 1000).collect().filter(_1000 This is how I submitted the job (on the Master machine): $ spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --executor-memory 256m --master spark://ip-10-171-32-62:7077 wc-spark/target/sparkwordcount-0.0.1-SNAPSHOT.jar /usr/share/dict/words 0 Any help would be greatly appreciated. Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Re: Streaming linear regression example question
Tnx for the workaround. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 16/03/15 06:20, Jeremy Freeman wrote: Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming with compressed xml files
One approach would be, If you are using fileStream you can access the individual filenames from the partitions and with that filename you can apply your uncompression logic/parsing logic and get it done. Like: UnionPartition upp = (UnionPartition) ds.values().getPartitions()[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.split(); String *fPath* = npp.serializableHadoopSplit().value().toString(); Another approach would be to create a custom inputReader and InpurFormat, then pass it along with your fileStream and within the reader, you do your uncompression/parsing etc. You can also look into XMLInputFormat https://github.com/apache/mahout/blob/ad84344e4055b1e6adff5779339a33fa29e1265d/examples/src/main/java/org/apache/mahout/classifier/bayes/XmlInputFormat.java of mahout. Thanks Best Regards On Mon, Mar 16, 2015 at 11:28 AM, Vijay Innamuri vijay.innam...@gmail.com wrote: Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Did you change both the versions? The one in your build file of your project and the spark version of your cluster? Thanks Best Regards On Sat, Mar 14, 2015 at 6:47 AM, EH eas...@gmail.com wrote: Hi all, I've been using Spark 1.1.0 for a while, and now would like to upgrade to Spark 1.1.1 or above. However, it throws the following errors: 18:05:31.522 [sparkDriver-akka.actor.default-dispatcher-3hread] ERROR TaskSchedulerImpl - Lost executor 37 on hcompute001: remote Akka client disassociated 18:05:31.530 [sparkDriver-akka.actor.default-dispatcher-3hread] WARN TaskSetManager - Lost task 0.0 in stage 1.0 (TID 0, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.567 [sparkDriver-akka.actor.default-dispatcher-2hread] ERROR TaskSchedulerImpl - Lost executor 3 on hcompute001: remote Akka client disassociated 18:05:31.568 [sparkDriver-akka.actor.default-dispatcher-2hread] WARN TaskSetManager - Lost task 1.0 in stage 1.0 (TID 1, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.988 [sparkDriver-akka.actor.default-dispatcher-23hread] ERROR TaskSchedulerImpl - Lost executor 24 on hcompute001: remote Akka client disassociated Do you know what may go wrong? I didn't change any codes, just changed the version of Spark. Thank you all, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Upgrade-from-Spark-1-1-0-to-1-1-1-Issues-tp22045.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.SparkException Error sending message
Not sure if this will help, but can you try setting the following: set(spark.core.connection.ack.wait.timeout,6000) Thanks Best Regards On Sat, Mar 14, 2015 at 4:08 AM, Chen Song chen.song...@gmail.com wrote: When I ran Spark SQL query (a simple group by query) via hive support, I have seen lots of failures in map phase. I am not sure if that is specific to Spark SQL or general. Any one has seen such errors before? java.io.IOException: org.apache.spark.SparkException: Error sending message [message = GetLocations(broadcast_9_piece0)] java.io.IOException: org.apache.spark.SparkException: Error sending message [message = GetLocations(broadcast_9_piece0)] at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Error sending message [message = GetLocations(broadcast_9_piece0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.getLocations(BlockManagerMaster.scala:70) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:592) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:587) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136) at scala.Option.orElse(Option.scala:257) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090) ... 12 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) ... 27 more -- Chen Song
Re: how to print RDD by key into file with grouByKey
If you want more partitions then you have specify it as: Rdd.groupByKey(*10*).mapValues... I think if you don't specify anything, the # partitions will be the # cores that you have for processing. Thanks Best Regards On Sat, Mar 14, 2015 at 12:28 AM, Adrian Mocanu amoc...@verticalscope.com wrote: Hi I have an RDD: RDD[(String, scala.Iterable[(Long, Int)])] which I want to print into a file, a file for each key string. I tried to trigger a repartition of the RDD by doing group by on it. The grouping gives RDD[(String, scala.Iterable[Iterable[(Long, Int)]])] so I flattened that: Rdd.groupByKey().mapValues(x=x.flatten) However, when I print with saveAsTextFile I get only 2 files I was under the impression that groupBy repartitions the data by key and saveAsTextFile make a file per partition. What am I doing wrong here? Thanks Adrian
Re: Running Scala Word Count Using Maven
Hello, So actually solved the problem...see point 3. Here are a few approaches/errors I was getting: 1) mvn package exec:java -Dexec.mainClass=HelloWorld Error: java.lang.ClassNotFoundException: HelloWorld 2) http://stackoverflow.com/questions/26929100/running-a-scala-application-in-maven-project same error as above 3) The solution I found was here: http://www.scala-lang.org/old/node/345 It seems to be similar to 2, but the build seems to include a few more things. So it does seem that a few things have to be added to the pom for it to work for scala, this was something I was a bit confused about after reading the book. Thanks for the help! Best, Su
Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
which location should i need to specify the classpath exactly . Thanks, On Mon, Mar 16, 2015 at 12:52 PM, Cheng, Hao hao.ch...@intel.com wrote: It doesn’t take effect if just putting jar files under the lib-managed/jars folder, you need to put that under class path explicitly. *From:* sandeep vura [mailto:sandeepv...@gmail.com] *Sent:* Monday, March 16, 2015 2:21 PM *To:* Cheng, Hao *Cc:* fightf...@163.com; Ted Yu; user *Subject:* Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient I have already added mysql-connector-xx.jar file in spark/lib-managed/jars directory. Regards, Sandeep.v On Mon, Mar 16, 2015 at 11:48 AM, Cheng, Hao hao.ch...@intel.com wrote: Or you need to specify the jars either in configuration or bin/spark-sql --jars mysql-connector-xx.jar *From:* fightf...@163.com [mailto:fightf...@163.com] *Sent:* Monday, March 16, 2015 2:04 PM *To:* sandeep vura; Ted Yu *Cc:* user *Subject:* Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. -- fightf...@163.com *From:* sandeep vura sandeepv...@gmail.com *Date:* 2015-03-16 14:13 *To:* Ted Yu yuzhih...@gmail.com *CC:* user@spark.apache.org *Subject:* Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC onstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional con nection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusExc eption(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfigurat ion(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenc eManagerFactory(JDOPersistenceManagerFactory.java:333) at
Re: Question about Spark Streaming Receiver Failure
Dibyendu, Thanks for the reply. I am reading your project homepage now. One quick question I care about is: If the receivers failed for some reasons(for example, killed brutally by someone else), is there any mechanism for the receiver to fail over automatically? On Mon, Mar 16, 2015 at 3:25 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Spark Streaming with compressed xml files
Hi All, Processing streaming JSON files with Spark features (Spark streaming and Spark SQL), is very efficient and works like a charm. Below is the code snippet to process JSON files. windowDStream.foreachRDD(IncomingFiles = { val IncomingFilesTable = sqlContext.jsonRDD(IncomingFiles); IncomingFilesTable.registerAsTable(IncomingFilesTable); val result = sqlContext.sql(select text from IncomingFilesTable).collect; sc.parallelize(result).saveAsTextFile(filepath); } But, I feel its difficult to use spark features efficiently with streaming xml files (each compressed file would be 4 MB). What is the best approach for processing compressed xml files? Regards Vijay
Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi Fightfate, I have attached my hive-site.xml file in the previous mail.Please check the configuration once. In hive i am able to create tables and also able to load data into hive table. Please find the attached file. Regards, Sandeep.v On Mon, Mar 16, 2015 at 11:34 AM, fightf...@163.com fightf...@163.com wrote: Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. -- fightf...@163.com *From:* sandeep vura sandeepv...@gmail.com *Date:* 2015-03-16 14:13 *To:* Ted Yu yuzhih...@gmail.com *CC:* user@spark.apache.org *Subject:* Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC onstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional con nection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusExc eption(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfigurat ion(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenc eManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceMa nagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementatio n(JDOHelper.java:1166) at
why generateJob is a private API?
Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
RE: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Or you need to specify the jars either in configuration or bin/spark-sql --jars mysql-connector-xx.jar From: fightf...@163.com [mailto:fightf...@163.com] Sent: Monday, March 16, 2015 2:04 PM To: sandeep vura; Ted Yu Cc: user Subject: Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. fightf...@163.commailto:fightf...@163.com From: sandeep vuramailto:sandeepv...@gmail.com Date: 2015-03-16 14:13 To: Ted Yumailto:yuzhih...@gmail.com CC: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura sandeepv...@gmail.commailto:sandeepv...@gmail.com wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC onstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional con nection factory NestedThrowables: java.lang.reflect.InvocationTargetException at
Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
I have already added mysql-connector-xx.jar file in spark/lib-managed/jars directory. Regards, Sandeep.v On Mon, Mar 16, 2015 at 11:48 AM, Cheng, Hao hao.ch...@intel.com wrote: Or you need to specify the jars either in configuration or bin/spark-sql --jars mysql-connector-xx.jar *From:* fightf...@163.com [mailto:fightf...@163.com] *Sent:* Monday, March 16, 2015 2:04 PM *To:* sandeep vura; Ted Yu *Cc:* user *Subject:* Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. -- fightf...@163.com *From:* sandeep vura sandeepv...@gmail.com *Date:* 2015-03-16 14:13 *To:* Ted Yu yuzhih...@gmail.com *CC:* user@spark.apache.org *Subject:* Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC onstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional con nection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusExc eption(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfigurat ion(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenc eManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceMa nagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at
Re: Need Advice about reading lots of text files
Hi, Internally Spark uses HDFS api to handle file data. Have a look at HAR, Sequence file input format. More information on this cloudera blog http://blog.cloudera.com/blog/2009/02/the-small-files-problem/. Regards, Madhukara Phatak http://datamantra.io/ On Sun, Mar 15, 2015 at 9:59 PM, Pat Ferrel p...@occamsmachete.com wrote: Ah most interesting—thanks. So it seems sc.textFile(longFileList) has to read all metadata before starting the read for partitioning purposes so what you do is not use it? You create a task per file that reads one file (in parallel) per task without scanning for _all_ metadata. Can’t argue with the logic but perhaps Spark should incorporate something like this in sc.textFile? My case can’t be that unusual especially since I am periodically processing micro-batches from Spark Streaming. In fact Actually I have to scan HDFS to create the longFileList to begin with so get file status and therefore probably all the metadata needed by sc.textFile. Your method would save one scan, which is good. Might a better sc.textFile take a beginning URI, a file pattern regex, and a recursive flag? Then one scan could create all metadata automatically for a large subset of people using the function, something like sc.textFile(beginDir: String, filePattern: String = “^part.*”, recursive: Boolean = false) I fact it should be easy to create BetterSC that overrides the textFile method with a re-implementation that only requires one scan to get metadata. Just thinking on email… On Mar 14, 2015, at 11:11 AM, Michael Armbrust mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark will process _temporary folder on S3 is very slow and always cause failure
If you use fileStream, there's an option to filter out files. In your case you can easily create a filter to remove _temporary files. In that case, you will have to move your codes inside foreachRDD of the dstream since the application will become a streaming app. Thanks Best Regards On Sat, Mar 14, 2015 at 4:26 AM, Shuai Zheng szheng.c...@gmail.com wrote: And one thing forget to mention, even I have this exception and the result is not well format in my target folder (part of them are there, rest are under different folder structure of _tempoary folder). In the webUI of spark-shell, it is still be marked as successful step. I think this is a bug? Regards, Shuai *From:* Shuai Zheng [mailto:szheng.c...@gmail.com] *Sent:* Friday, March 13, 2015 6:51 PM *To:* user@spark.apache.org *Subject:* Spark will process _temporary folder on S3 is very slow and always cause failure Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
Question about Spark Streaming Receiver Failure
Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Does spark-1.3.0 support the analytic functions defined in Hive, such as row_number, rank
Hi all, I'm wondering whether the latest spark-1.3.0 supports the windowing and analytic funtions in hive, such as row_number, rank and etc. Indeed, I've done some testing by using spark-shell and found that row_number is not supported yet. But I still found that there were some test case related to row_number and other analytics functions. These test cases is defined in sql/hive/target/scala-2.10/test-classes/ql/src/test/queries/clientpositive/windowing_multipartitioning.q So my question is divided into two parts, One is whether the analytics function is supported or not, the othere one is that if it's not supported why there are still some test cases hseagle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-1-3-0-support-the-analytic-functions-defined-in-Hive-such-as-row-number-rank-tp22072.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set Spark executor memory?
Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David
Re: How to set Spark executor memory?
How are you setting it? and how are you submitting the job? Thanks Best Regards On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have set spark.executor.memory to 2048m, and in the UI Environment page, I can see this value has been set correctly. But in the Executors page, I saw there's only 1 executor and its memory is 265.4MB. Very strange value. why not 256MB, or just as what I set? What am I missing here? Thanks, David
Re: Question about Spark Streaming Receiver Failure
Akhil, I have checked the logs. There isn't any clue as to why the 5 receivers failed. That's why I just take it for granted that it will be a common issue for receiver failures, and we need to figure out a way to detect this kind of failure and do fail-over. Thanks On Mon, Mar 16, 2015 at 3:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro -- yangjun...@gmail.com http://hi.baidu.com/yjpro
Re: k-means hang without error/warning
How many threads are you allocating while creating the sparkContext? like local[4] will allocate 4 threads. You can try increasing it to a higher number also try setting level of parallelism to a higher number. Thanks Best Regards On Mon, Mar 16, 2015 at 9:55 AM, Xi Shen davidshe...@gmail.com wrote: Hi, I am running k-means using Spark in local mode. My data set is about 30k records, and I set the k = 1000. The algorithm starts and finished 13 jobs according to the UI monitor, then it stopped working. The last log I saw was: [Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned broadcast *16* There're many similar log repeated, but it seems it always stop at the 16th. If I try to low down the *k* value, the algorithm will terminated. So I just want to know what's wrong with *k=1000*. Thanks, David
Re: Question about Spark Streaming Receiver Failure
You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro
RE: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
It doesn’t take effect if just putting jar files under the lib-managed/jars folder, you need to put that under class path explicitly. From: sandeep vura [mailto:sandeepv...@gmail.com] Sent: Monday, March 16, 2015 2:21 PM To: Cheng, Hao Cc: fightf...@163.com; Ted Yu; user Subject: Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient I have already added mysql-connector-xx.jar file in spark/lib-managed/jars directory. Regards, Sandeep.v On Mon, Mar 16, 2015 at 11:48 AM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Or you need to specify the jars either in configuration or bin/spark-sql --jars mysql-connector-xx.jar From: fightf...@163.commailto:fightf...@163.com [mailto:fightf...@163.commailto:fightf...@163.com] Sent: Monday, March 16, 2015 2:04 PM To: sandeep vura; Ted Yu Cc: user Subject: Re: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi, Sandeep From your error log I can see that jdbc driver not found in your classpath. Did you had your mysql jdbc jar correctly configured in the specific classpath? Can you establish a hive jdbc connection using the url : jdbc:hive2://localhost:1 ? Thanks, Sun. fightf...@163.commailto:fightf...@163.com From: sandeep vuramailto:sandeepv...@gmail.com Date: 2015-03-16 14:13 To: Ted Yumailto:yuzhih...@gmail.com CC: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Hi Ted, Did you find any solution. Thanks Sandeep On Mon, Mar 16, 2015 at 10:44 AM, sandeep vura sandeepv...@gmail.commailto:sandeepv...@gmail.com wrote: Hi Ted, I am using Spark -1.2.1 and hive -0.13.1 you can check my configuration files attached below. ERROR IN SPARK n: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkS QLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQ LCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl. java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces sorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h ive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore Utils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(Retry ingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret ryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja va:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav a:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct orAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingC
Re: Question about Spark Streaming Receiver Failure
Which version of Spark you are running ? You can try this Low Level Consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer This is designed to recover from various failures and have very good fault recovery mechanism built in. This is being used by many users and at present we at Pearson running this Receiver in Production for almost 3 months without any issue. You can give this a try. Regards, Dibyendu On Mon, Mar 16, 2015 at 12:47 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to figure out why the receivers failed in the first place. Look in your worker logs and see what really happened. When you run a streaming job continuously for longer period mostly there'll be a lot of logs (you can enable log rotation etc.) and if you are doing a groupBy, join, etc type of operations, then there will be a lot of shuffle data. So You need to check in the worker logs and see what happened (whether DISK full etc.), We have streaming pipelines running for weeks without having any issues. Thanks Best Regards On Mon, Mar 16, 2015 at 12:40 PM, Jun Yang yangjun...@gmail.com wrote: Guys, We have a project which builds upon Spark streaming. We use Kafka as the input stream, and create 5 receivers. When this application runs for around 90 hour, all the 5 receivers failed for some unknown reasons. In my understanding, it is not guaranteed that Spark streaming receiver will do fault recovery automatically. So I just want to figure out a way for doing fault-recovery to deal with receiver failure. There is a JIRA post mentioned using StreamingLister for monitoring the status of receiver: https://issues.apache.org/jira/browse/SPARK-2381?focusedCommentId=14056836page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14056836 However I haven't found any open doc about how to do this stuff. Any guys have met the same issue and deal with it? Our environment: Spark 1.3.0 Dual Master Configuration Kafka 0.8.2 Thanks -- yangjun...@gmail.com http://hi.baidu.com/yjpro