Re: Spark and S3 server side encryption
Adding on what Thomas said. There have been a few bug fixes for s3a since Hadoop 2.6.0 was released. One example is HADOOP-11446. The fixes would be in Hadoop 2.7.0 Cheers On Jan 27, 2015, at 1:41 AM, Thomas Demoor thomas.dem...@amplidata.com wrote: Spark uses the Hadoop filesystems. I assume you are trying to use s3n:// which, under the hood, uses the 3rd party jets3t library. It is configured through the jets3t.properties file (google hadoop s3n jets3t) which you should put on Spark's classpath. The setting you are looking for is s3service.server-side-encryption The last version of hadoop (2.6) introduces a new and improved s3a:// filesystem which has the official sdk from Amazon under the hood. On Mon, Jan 26, 2015 at 10:01 PM, curtkohler c.koh...@elsevier.com wrote: We are trying to create a Spark job that writes out a file to S3 that leverage S3's server side encryption for sensitive data. Typically this is accomplished by setting the appropriate header on the put request, but it isn't clear whether this capability is exposed in the Spark/Hadoop APIs. Does anyone have any suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-S3-server-side-encryption-tp21377.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mathematical functions in spark sql
Created SPARK-5427 for the addition of floor function. On Mon, Jan 26, 2015 at 11:20 PM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: I have tried select ceil(2/3), but got key not found: floor On Tue, Jan 27, 2015 at 11:05 AM, Ted Yu yuzhih...@gmail.com wrote: Have you tried floor() or ceil() functions ? According to http://spark.apache.org/sql/, Spark SQL is compatible with Hive SQL. Cheers On Mon, Jan 26, 2015 at 8:29 PM, 1esha alexey.romanc...@gmail.com wrote: Hello everyone! I try execute select 2/3 and I get 0.. Is there any way to cast double to int or something similar? Also it will be cool to get list of functions supported by spark sql. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mathematical-functions-in-spark-sql-tp21383.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda
Hi all, I can run spark job pragmatically in j2SE with following code without any error: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class TestSpark { public static void main() { String sourcePath = hdfs://spark:54310/input/*; SparkConf conf = new SparkConf().setAppName(TestLineCount); conf.setJars(new String[] { TestSpark.class.getProtectionDomain() .getCodeSource().getLocation().getPath() }); conf.setMaster(spark://tootak:7077); conf.set(spark.driver.allowMultipleContexts, true); @SuppressWarnings(resource) JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString log = sc.textFile(sourcePath); JavaRDDString lines = log.filter(x - { return true; }); System.out.println(lines.count()); } } But when I run this snippet through the sparkjava.com framework as a web-service, the java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda occurred! package blah; import static spark.Spark.get; import javax.ws.rs.core.Response; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import spark.servlet.SparkApplication; public class App implements SparkApplication { @Override public void init() { get(/hello, (req, res) - { String sourcePath = hdfs://spark:54310/input/*; SparkConf conf = new SparkConf().setAppName(TestLineCount); conf.setJars(new String[] { App.class.getProtectionDomain() .getCodeSource().getLocation().getPath() }); conf.setMaster(spark://tootak:7077); conf.set(spark.driver.allowMultipleContexts, true); @SuppressWarnings(resource) JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString log = sc.textFile(sourcePath); JavaRDDString lines = log.filter(x - { return true; }); return Response.ok(lines.count()).build(); }); } } but when I run this web-service api with tomcat, following error occurs: [task-result-getter-0] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, tootak): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Here is all error log: https://gist.github.com/khajavi/89f802398d6f8c40e23e#file-error-log Any suggestion how to resolve this problem? -- Milād Khājavi http://blog.khajavi.ir Having the source means you can do it yourself. I tried to change the world, but I couldn’t find the source code. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [documentation] Update the python example ALS of the site?
will be fixed by https://github.com/apache/spark/pull/4226 On Tue, Jan 27, 2015 at 8:17 AM, gen tang gen.tan...@gmail.com wrote: Hi, In the spark 1.2.0, it requires the ratings should be a RDD of Rating or tuple or list. However, the current example in the site use still RDD[array] as the ratings. Therefore, the example doesn't work under the version 1.2.0. May be we should update the documentation of the site? Thanks a lot. Cheers Gen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NegativeArraySizeException in pyspark when loading an RDD pickleFile
Maybe it's caused by integer overflow, is it possible that one object or batch bigger than 2G (after pickling)? On Tue, Jan 27, 2015 at 7:59 AM, rok rokros...@gmail.com wrote: I've got an dataset saved with saveAsPickleFile using pyspark -- it saves without problems. When I try to read it back in, it fails with: Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times, most recent failure: Lost task 401.3 in stage 0.0 (TID 449, e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException: org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119) org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98) org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153) org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875) org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848) org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103) org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) Not really sure where to start looking for the culprit -- any suggestions most welcome. Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (Streaming?) holding on to Mesos resources
Hi Gerard, As others has mentioned I believe you're hitting Mesos-1688, can you upgrade to the latest Mesos release (0.21.1) and let us know if it resolves your problem? Thanks, Tim On Tue, Jan 27, 2015 at 10:39 AM, Sam Bessalah samkiller@gmail.com wrote: Hi Geraard, isn't this the same issueas this? https://issues.apache.org/jira/browse/MESOS-1688 On Mon, Jan 26, 2015 at 9:17 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We are observing with certain regularity that our Spark jobs, as Mesos framework, are hoarding resources and not releasing them, resulting in resource starvation to all jobs running on the Mesos cluster. For example: This is a job that has spark.cores.max = 4 and spark.executor.memory=3g IDFrameworkHostCPUsMem…5050-16506-1146497FooStreamingdnode-4.hdfs.private 713.4 GB…5050-16506-1146495FooStreaming dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming dnode-0.hdfs.private6.46.8 GB The only way to release the resources is by manually finding the process in the cluster and killing it. The jobs are often streaming but also batch jobs show this behavior. We have more streaming jobs than batch, so stats are biased. Any ideas of what's up here? Hopefully some very bad ugly bug that has been fixed already and that will urge us to upgrade our infra? Mesos 0.20 + Marathon 0.7.4 + Spark 1.1.0 -kr, Gerard.
Re: Spark (Streaming?) holding on to Mesos resources
Hi Geraard, isn't this the same issueas this? https://issues.apache.org/jira/browse/MESOS-1688 On Mon, Jan 26, 2015 at 9:17 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, We are observing with certain regularity that our Spark jobs, as Mesos framework, are hoarding resources and not releasing them, resulting in resource starvation to all jobs running on the Mesos cluster. For example: This is a job that has spark.cores.max = 4 and spark.executor.memory=3g IDFrameworkHostCPUsMem…5050-16506-1146497FooStreamingdnode-4.hdfs.private713.4 GB…5050-16506-1146495FooStreaming dnode-0.hdfs.private16.4 GB…5050-16506-1146491FooStreaming dnode-5.hdfs.private711.9 GB…5050-16506-1146449FooStreaming dnode-3.hdfs.private74.9 GB…5050-16506-1146247FooStreaming dnode-1.hdfs.private0.55.9 GB…5050-16506-1146226FooStreaming dnode-2.hdfs.private37.9 GB…5050-16506-1144069FooStreaming dnode-3.hdfs.private18.7 GB…5050-16506-1133091FooStreaming dnode-5.hdfs.private11.7 GB…5050-16506-1133090FooStreaming dnode-2.hdfs.private55.2 GB…5050-16506-1133089FooStreaming dnode-1.hdfs.private6.56.3 GB…5050-16506-1133088FooStreaming dnode-4.hdfs.private1251 MB…5050-16506-1133087FooStreaming dnode-0.hdfs.private6.46.8 GB The only way to release the resources is by manually finding the process in the cluster and killing it. The jobs are often streaming but also batch jobs show this behavior. We have more streaming jobs than batch, so stats are biased. Any ideas of what's up here? Hopefully some very bad ugly bug that has been fixed already and that will urge us to upgrade our infra? Mesos 0.20 + Marathon 0.7.4 + Spark 1.1.0 -kr, Gerard.
Bulk loading into hbase using saveAsNewAPIHadoopFile
Hi Team, I need some help on writing a scala to bulk load some data into hbase. *Env:* hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use saveAsNewAPIHadoopFile? 2. Is there any other way to do this? For example, firstly write a hfile on hdfs, and then use hbase command to bulk load? Any sample code using scala? Thanks. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. *Env:* hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use saveAsNewAPIHadoopFile? 2. Is there any other way to do this? For example, firstly write a hfile on hdfs, and then use hbase command to bulk load? Any sample code using scala? Thanks. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: Issues when combining Spark and a third party java library
To clarify: I'm currently working on this locally, running on a laptop and I do not use Spark-submit (using Eclipse to run my applications currently). I've tried running both on Mac OS X and in a VM running Ubuntu. Furthermore, I've got the VM from a fellow worker which has no issues running his Spark and Hadoop applications on his machine while using the same VM. (So all the issues seems to pop up by just introducing a new dependency in the project). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p21384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (Streaming?) holding on to Mesos Resources
Hopefully some very bad ugly bug that has been fixed already and that will urge us to upgrade our infra? Mesos 0.20 + Marathon 0.7.4 + Spark 1.1.0 Could be https://issues.apache.org/jira/browse/MESOS-1688 (fixed in Mesos 0.21) On Mon, Jan 26, 2015 at 2:45 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Jörn, A memory leak on the job would be contained within the resources reserved for it, wouldn't it? And the job holding resources is not always the same. Sometimes it's one of the Streaming jobs, sometimes it's a heavy batch job that runs every hour. Looks to me that whatever is causing the issue, it's participating in the resource offer protocol of Mesos and my first suspect would be the Mesos scheduler in Spark. (The table above is the tab Offers from the Mesos UI. Are there any other factors involved in the offer acceptance/rejection between Mesos and a scheduler? What do you think? -kr, Gerard. On Mon, Jan 26, 2015 at 11:23 PM, Jörn Franke jornfra...@gmail.com wrote: Hi, What do your jobs do? Ideally post source code, but some description would already helpful to support you. Memory leaks can have several reasons - it may not be Spark at all. Thank you. Le 26 janv. 2015 22:28, Gerard Maas gerard.m...@gmail.com a écrit : (looks like the list didn't like a HTML table on the previous email. My excuses for any duplicates) Hi, We are observing with certain regularity that our Spark jobs, as Mesos framework, are hoarding resources and not releasing them, resulting in resource starvation to all jobs running on the Mesos cluster. For example: This is a job that has spark.cores.max = 4 and spark.executor.memory=3g | ID |Framework |Host|CPUs |Mem …5050-16506-1146497 FooStreaming dnode-4.hdfs.private 7 13.4 GB …5050-16506-1146495 FooStreamingdnode-0.hdfs.private 1 6.4 GB …5050-16506-1146491 FooStreamingdnode-5.hdfs.private 7 11.9 GB …5050-16506-1146449 FooStreamingdnode-3.hdfs.private 7 4.9 GB …5050-16506-1146247 FooStreamingdnode-1.hdfs.private 0.5 5.9 GB …5050-16506-1146226 FooStreamingdnode-2.hdfs.private 3 7.9 GB …5050-16506-1144069 FooStreamingdnode-3.hdfs.private 1 8.7 GB …5050-16506-1133091 FooStreamingdnode-5.hdfs.private 1 1.7 GB …5050-16506-1133090 FooStreamingdnode-2.hdfs.private 5 5.2 GB …5050-16506-1133089 FooStreamingdnode-1.hdfs.private 6.5 6.3 GB …5050-16506-1133088 FooStreamingdnode-4.hdfs.private 1 251 MB …5050-16506-1133087 FooStreamingdnode-0.hdfs.private 6.4 6.8 GB The only way to release the resources is by manually finding the process in the cluster and killing it. The jobs are often streaming but also batch jobs show this behavior. We have more streaming jobs than batch, so stats are biased. Any ideas of what's up here? Hopefully some very bad ugly bug that has been fixed already and that will urge us to upgrade our infra? Mesos 0.20 + Marathon 0.7.4 + Spark 1.1.0 -kr, Gerard.
RE: [SQL] Self join with ArrayType columns problems
The root cause for this probably because the identical “exprId” of the “AttributeReference” existed while do self-join with “temp table” (temp table = resolved logical plan). I will do the bug fixing and JIRA creation. Cheng Hao From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Tuesday, January 27, 2015 12:05 AM To: Dean Wampler Cc: Pierre B; user@spark.apache.org; Cheng Hao Subject: Re: [SQL] Self join with ArrayType columns problems It seems likely that there is some sort of bug related to the reuse of array objects that are returned by UDFs. Can you open a JIRA? I'll also note that the sql method on HiveContext does run HiveQL (configured by spark.sql.dialect) and the hql method has been deprecated since 1.1 (and will probably be removed in 1.3). The errors are probably because array and collect set are hive UDFs and thus not available in a SQLContext. On Mon, Jan 26, 2015 at 5:44 AM, Dean Wampler deanwamp...@gmail.commailto:deanwamp...@gmail.com wrote: You are creating a HiveContext, then using the sql method instead of hql. Is that deliberate? The code doesn't work if you replace HiveContext with SQLContext. Lots of exceptions are thrown, but I don't have time to investigate now. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Editionhttp://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafehttp://typesafe.com @deanwamplerhttp://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jan 26, 2015 at 7:17 AM, Pierre B pierre.borckm...@realimpactanalytics.commailto:pierre.borckm...@realimpactanalytics.com wrote: Using Spark 1.2.0, we are facing some weird behaviour when performing self join on a table with some ArrayType field. (potential bug ?) I have set up a minimal non working example here: https://gist.github.com/pierre-borckmans/4853cd6d0b2f2388bf4f https://gist.github.com/pierre-borckmans/4853cd6d0b2f2388bf4f In a nutshell, if the ArrayType column used for the pivot is created manually in the StructType definition, everything works as expected. However, if the ArrayType pivot column is obtained by a sql query (be it by using a array wrapper, or using a collect_list operator for instance), then results are completely off. Could anyone have a look as this really is a blocking issue. Thanks! Cheers P. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Self-join-with-ArrayType-columns-problems-tp21364.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: HDFS Namenode in safemode when I turn off my EC2 instance
Thanks Akhil! 1) I had to do sudo -u hdfs hdfs dfsadmin -safemode leave a) I had created a user called hdfs with superuser privileges in Hue, hence the double hdfs. 2) Lastly, I know this is getting a bit off topic, but this is my etc/hosts file: 127.0.0.1 localhost.localdomain localhost ::1 localhost6.localdomain6 localhost6 Does this look okay? I used http://blogs.aws.amazon.com/bigdata/post/Tx2D0J7QOVRJBRX/Deploying-Cloudera-s-Enterprise-Data-Hub-on-AWS to deploy my cluster. I've tried pinging the NAT instance that didn't work. I tried changing the security group of the individual instances to allow any inbound/outbound traffic (bad practice, but I set it back to the normal configuration when it failed). The only ping that works is pinging the private (only private, not public) ips of other instances in the subnet.I know aws is out of the scope so no worries if you can't get to this! Thanks, Su On Mon, Jan 26, 2015 at 8:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Command would be: hadoop dfsadmin -safemode leave If you are not able to ping your instances, it can be because of you are blocking all the ICMP requests. Im not quiet sure why you are not able to ping google.com from your instances. Make sure the internal IP (ifconfig) is proper in the file /etc/hosts. And regarding the kafka, im assuming that youbare running on a single instance and hence you are not having any issues ( mostly, it binds to localhost in that case) On 27 Jan 2015 07:25, Su She suhsheka...@gmail.com wrote: Hello Sean and Akhil, I shut down the services on Cloudera Manager. I shut them down in the appropriate order and then stopped all services of CM. I then shut down my instances. I then turned my instances back on, but I am getting the same error. 1) I tried hadoop fs -safemode leave and it said -safemode is an unknown command, but it does recognize hadoop fs 2) I also noticed I can't ping my instances from my personal laptop and I can't ping google.com from my instances. However, I can still run my Kafka Zookeeper/server/console producer/consumer. I know this is the spark thread, but thought that might be relevant. Thank you for any suggestions! Best, Su On Thu, Jan 22, 2015 at 2:41 AM, Sean Owen so...@cloudera.com wrote: If you are using CDH, you would be shutting down services with Cloudera Manager. I believe you can do it manually using Linux 'services' if you do the steps correctly across your whole cluster. I'm not sure if the stock stop-all.sh script is supposed to work. Certainly, if you are using CM, by far the easiest is to start/stop all of these things in CM. On Wed, Jan 21, 2015 at 6:08 PM, Su She suhsheka...@gmail.com wrote: Hello Sean Akhil, I tried running the stop-all.sh script on my master and I got this message: localhost: Permission denied (publickey,gssapi-keyex,gssapi-with-mic). chown: changing ownership of `/opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/logs': Operation not permitted no org.apache.spark.deploy.master.Master to stop I am running Spark (Yarn) via Cloudera Manager. I tried stopping it from Cloudera Manager first, but it looked like it was only stopping the history server, so I started Spark again and tried ./stop-all.sh and got the above message. Also, what is the command for shutting down storage or can I simply stop hdfs in Cloudera Manager? Thank you for the help! On Sat, Jan 17, 2015 at 12:58 PM, Su She suhsheka...@gmail.com wrote: Thanks Akhil and Sean for the responses. I will try shutting down spark, then storage and then the instances. Initially, when hdfs was in safe mode, I waited for 1 hour and the problem still persisted. I will try this new method. Thanks! On Sat, Jan 17, 2015 at 2:03 AM, Sean Owen so...@cloudera.com wrote: You would not want to turn off storage underneath Spark. Shut down Spark first, then storage, then shut down the instances. Reverse the order when restarting. HDFS will be in safe mode for a short time after being started before it becomes writeable. I would first check that it's not just that. Otherwise, find out why the cluster went into safe mode from the logs, fix it, and then leave safe mode. On Sat, Jan 17, 2015 at 9:03 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Safest way would be to first shutdown HDFS and then shutdown Spark (call stop-all.sh would do) and then shutdown the machines. You can execute the following command to disable safe mode: hadoop fs -safemode leave Thanks Best Regards On Sat, Jan 17, 2015 at 8:31 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am encountering trouble running Spark applications when I shut down my EC2 instances. Everything else seems to work except Spark. When I try running a simple Spark application, like sc.parallelize()
Re: Spark and S3 server side encryption
Spark uses the Hadoop filesystems. I assume you are trying to use s3n:// which, under the hood, uses the 3rd party jets3t library. It is configured through the jets3t.properties file (google hadoop s3n jets3t) which you should put on Spark's classpath. The setting you are looking for is s3service.server-side-encryption The last version of hadoop (2.6) introduces a new and improved s3a:// filesystem which has the official sdk from Amazon under the hood. On Mon, Jan 26, 2015 at 10:01 PM, curtkohler c.koh...@elsevier.com wrote: We are trying to create a Spark job that writes out a file to S3 that leverage S3's server side encryption for sensitive data. Typically this is accomplished by setting the appropriate header on the put request, but it isn't clear whether this capability is exposed in the Spark/Hadoop APIs. Does anyone have any suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-S3-server-side-encryption-tp21377.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD.combineBy
Hi All, I have a use case where I have an RDD (not a k,v pair) where I want to do a combineByKey() operation. I can do that by creating an intermediate RDD of k,v pairs and using PairRDDFunctions.combineByKey(). However, I believe it will be more efficient if I can avoid this intermediate RDD. Is there a way I can do this by passing in a function that extracts the key, like in RDD.groupBy()? [oops, RDD.groupBy seems to create the intermediate RDD anyway, maybe a better implementation is possible for that too?] If not, is it worth adding to the Spark API? Mohit. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Thanks Ted. Could you give me a simple example to load one row data in hbase? How should I generate the KeyValue? I tried multiple times, and still can not figure it out. On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote: Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. *Env:* hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use saveAsNewAPIHadoopFile? 2. Is there any other way to do this? For example, firstly write a hfile on hdfs, and then use hbase command to bulk load? Any sample code using scala? Thanks. -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool) -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
Re: RDD.combineBy
Have you looked at the `aggregate` function in the RDD API ? If your way of extracting the “key” (identifier) and “value” (payload) parts of the RDD elements is uniform (a function), it’s unclear to me how this would be more efficient that extracting key and value and then using combine, however. — FG On Tue, Jan 27, 2015 at 10:17 PM, Mohit Jaggi mohitja...@gmail.com wrote: Hi All, I have a use case where I have an RDD (not a k,v pair) where I want to do a combineByKey() operation. I can do that by creating an intermediate RDD of k,v pairs and using PairRDDFunctions.combineByKey(). However, I believe it will be more efficient if I can avoid this intermediate RDD. Is there a way I can do this by passing in a function that extracts the key, like in RDD.groupBy()? [oops, RDD.groupBy seems to create the intermediate RDD anyway, maybe a better implementation is possible for that too?] If not, is it worth adding to the Spark API? Mohit. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Storing DecisionTreeModel
Ok, thanks for your reply! On Tue, Jan 27, 2015 at 2:32 PM, Joseph Bradley jos...@databricks.com wrote: Hi Andres, Currently, serializing the object is probably the best way to do it. However, there are efforts to support actual model import/export: https://issues.apache.org/jira/browse/SPARK-4587 https://issues.apache.org/jira/browse/SPARK-1406 I'm hoping to have the PR for the first JIRA ready soon. Joseph On Tue, Jan 27, 2015 at 7:45 AM, andresbm84 andresb...@gmail.com wrote: Hi everyone, Is there a way to save on disk the model to reuse it later? I could serialize the object and save the bytes, but I'm guessing there might be a better way to do so. Has anyone tried that? Andres. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Storing-DecisionTreeModel-tp21393.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Andrés Blanco Morales
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi Antony, If you look in the YARN NodeManager logs, do you see that it's killing the executors? Or are they crashing for a different reason? -Sandy On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used: spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas, Antony.
Re: Index wise most frequently occuring element
Use combineByKey. For top 10 as an example (bottom 10 work similarly): add the element to a list. If the list is larger than 10, delete the smallest elements until size is back to 10. -Sven On Tue, Jan 27, 2015 at 3:35 AM, kundan kumar iitr.kun...@gmail.com wrote: I have a an array of the form val array: Array[(Int, (String, Int))] = Array( (idx1,(word1,count1)), (idx2,(word2,count2)), (idx1,(word1,count1)), (idx3,(word3,count1)), (idx4,(word4,count4))) I want to get the top 10 and bottom 10 elements from this array for each index (idx1,idx2,). Basically I want the top 10 most occuring and bottom 10 least occuring elements for each index value. Please suggest how to acheive in spark in most efficient way. I have tried it using the for loops for each index but this makes the program too slow and runs sequentially. Thanks, Kundan -- http://sites.google.com/site/krasser/?utm_source=sig
Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
I used below code, and it still failed with the same error. Anyone has experience on bulk loading using scala? Thanks. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/13, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) On Tue, Jan 27, 2015 at 12:17 PM, Jim Green openkbi...@gmail.com wrote: Thanks Ted. Could you give me a simple example to load one row data in hbase? How should I generate the KeyValue? I tried multiple times, and still can not figure it out. On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote: Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. *Env:* hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/8, classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat], conf) However I am allways getting below error: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.KeyValue at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) My questions are: 1. Do we have a sample code to do bulk load into hbase directly? Can we use
How to start spark master on windows
I download spark 1.2.0 on my windows server 2008. How do I start spark master? I tried to run the following on command prompt C:\spark-1.2.0-bin-hadoop2.4 bin\spark-class.cmd org.apache.spark.deploy.master.Master I got the error else was unexpected at this time. Ningjun
Re: Storing DecisionTreeModel
Hi Andres, Currently, serializing the object is probably the best way to do it. However, there are efforts to support actual model import/export: https://issues.apache.org/jira/browse/SPARK-4587 https://issues.apache.org/jira/browse/SPARK-1406 I'm hoping to have the PR for the first JIRA ready soon. Joseph On Tue, Jan 27, 2015 at 7:45 AM, andresbm84 andresb...@gmail.com wrote: Hi everyone, Is there a way to save on disk the model to reuse it later? I could serialize the object and save the bytes, but I'm guessing there might be a better way to do so. Has anyone tried that? Andres. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Storing-DecisionTreeModel-tp21393.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used:spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas,Antony.
Running a script on scala-shell on Spark Standalone Cluster
I configured 4 pcs with spark-1.2.0-bin-hadoop2.4.tgz with Spark Standalone on a Cluster. On master pc i executed: ./sbin/start-master.sh ./sbin/start-slaves.sh (4 instances) On datanode1 ,datanode2, secondarymaster pcs i executed: ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 so when i go to https://localhost:8080 i see this: http://apache-spark-user-list.1001560.n3.nabble.com/file/n21396/Screen_Shot_2015-01-28_at_2.png But when i run a script i see errors like those below and i don't what those errors exist: - :load Recommender/Recommender.scala Loading Recommender/Recommender.scala... import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating import java.io._ import scala.util.control.Breaks._ import scala.collection.mutable.ListBuffer data: org.apache.spark.rdd.RDD[String] = data/cut/ratings.txt MappedRDD[819] at textFile at console:51 users: org.apache.spark.rdd.RDD[String] = data/cut/user.txt MappedRDD[821] at textFile at console:51 ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MappedRDD[822] at map at console:55 rank: Int = 10 numIterations: Int = 20 15/01/28 02:13:18 ERROR TaskSetManager: Task 1 in stage 105.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 105.0 failed 4 times, most recent failure: Lost task 1.3 in stage 105.0 (TID 212, datanode2): java.io.FileNotFoundException: File file:/home/sparkuser/spark-1.2.0/spark-1.2.0-bin-hadoop2.4/data/cut/ratings.txt does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.init(ChecksumFileSystem.java:137) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:764) at org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:108) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:233) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at
performance of saveAsTextFile moving files from _temporary
We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Error reporting/collecting for users
Hi, in my Spark Streaming application, computations depend on users' input in terms of * user-defined functions * computation rules * etc. that can throw exceptions in various cases (think: exception in UDF, division by zero, invalid access by key etc.). Now I am wondering about what is a good/reasonable way to deal with those errors. I think I want to continue processing (the whole stream processing pipeline should not die because of one single malformed item in the stream), i.e., catch the exception, but still I need a way to tell the user something went wrong. So how can I get the information that something went wrong back to the driver and what is a reasonable way to do that? While writing this, something like the following came into my mind: val errors = sc.accumulator(...) // of type List[Throwable] dstream.map(item = { Try { someUdf(item) } match { case Success(value) = value case Failure(err) = errors += err // remember error 0 // default value } }) Does this make sense? Thanks Tobias
Re: Large number of pyspark.daemon processes
After slimming down the job quite a bit, it looks like a call to coalesce() on a larger RDD can cause these Python worker spikes (additional details in Jira: https://issues.apache.org/jira/browse/SPARK-5395?focusedCommentId=14294570page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14294570 ). Any ideas to what coalesce() is doing that triggers the creation of additional workers? On Sat, Jan 24, 2015 at 12:27 AM, Sven Krasser kras...@gmail.com wrote: Hey Davies, Sure thing, it's filed here now: https://issues.apache.org/jira/browse/SPARK-5395 As far as a repro goes, what is a normal number of workers I should expect? Even shortly after kicking the job off, I see workers in the double-digits per container. Here's an example using pstree on a worker node (the worker node runs 16 containers, i.e. the java--bash branches below). The initial Python process per container is forked between 7 and 33 times. ├─java─┬─bash───java─┬─python───22*[python] │ │ └─89*[{java}] │ ├─bash───java─┬─python───13*[python] │ │ └─80*[{java}] │ ├─bash───java─┬─python───9*[python] │ │ └─78*[{java}] │ ├─bash───java─┬─python───24*[python] │ │ └─91*[{java}] │ ├─3*[bash───java─┬─python───19*[python]] │ │└─86*[{java}]] │ ├─bash───java─┬─python───25*[python] │ │ └─90*[{java}] │ ├─bash───java─┬─python───33*[python] │ │ └─98*[{java}] │ ├─bash───java─┬─python───7*[python] │ │ └─75*[{java}] │ ├─bash───java─┬─python───15*[python] │ │ └─80*[{java}] │ ├─bash───java─┬─python───18*[python] │ │ └─84*[{java}] │ ├─bash───java─┬─python───10*[python] │ │ └─85*[{java}] │ ├─bash───java─┬─python───26*[python] │ │ └─90*[{java}] │ ├─bash───java─┬─python───17*[python] │ │ └─85*[{java}] │ ├─bash───java─┬─python───24*[python] │ │ └─92*[{java}] │ └─265*[{java}] Each container has 2 cores in case that makes a difference. Thank you! -Sven On Fri, Jan 23, 2015 at 11:52 PM, Davies Liu dav...@databricks.com wrote: It should be a bug, the Python worker did not exit normally, could you file a JIRA for this? Also, could you show how to reproduce this behavior? On Fri, Jan 23, 2015 at 11:45 PM, Sven Krasser kras...@gmail.com wrote: Hey Adam, I'm not sure I understand just yet what you have in mind. My takeaway from the logs is that the container actually was above its allotment of about 14G. Since 6G of that are for overhead, I assumed there to be plenty of space for Python workers, but there seem to be more of those than I'd expect. Does anyone know if that is actually the intended behavior, i.e. in this case over 90 Python processes on a 2 core executor? Best, -Sven On Fri, Jan 23, 2015 at 10:04 PM, Adam Diaz adam.h.d...@gmail.com wrote: Yarn only has the ability to kill not checkpoint or sig suspend. If you use too much memory it will simply kill tasks based upon the yarn config. https://issues.apache.org/jira/browse/YARN-2172 On Friday, January 23, 2015, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Sven, What version of Spark are you running? Recent versions have a change that allows PySpark to share a pool of processes instead of starting a new one for each task. -Sandy On Fri, Jan 23, 2015 at 9:36 AM, Sven Krasser kras...@gmail.com wrote: Hey all, I am running into a problem where YARN kills containers for being over their memory allocation (which is about 8G for executors plus 6G for overhead), and I noticed that in those containers there are tons of pyspark.daemon processes hogging memory. Here's a snippet from a container with 97 pyspark.daemon processes. The total sum of RSS usage across all of these is 1,764,956 pages (i.e. 6.7GB on the system). Any ideas what's happening here and how I can get the number of pyspark.daemon processes back to a more reasonable count? 2015-01-23 15:36:53,654 INFO [Reporter] yarn.YarnAllocationHandler (Logging.scala:logInfo(59)) - Container marked as failed: container_1421692415636_0052_01_30. Exit status: 143. Diagnostics: Container [pid=35211,containerID=container_1421692415636_0052_01_30] is running beyond physical memory limits. Current usage: 14.9 GB of 14.5 GB physical memory used; 41.3 GB of 72.5 GB virtual memory used. Killing container. Dump of the process-tree for container_1421692415636_0052_01_30 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 54101 36625
RE: Running a script on scala-shell on Spark Standalone Cluster
Looks like the culprit is this error: FileNotFoundException: File file:/home/sparkuser/spark-1.2.0/spark-1.2.0-bin-hadoop2.4/data/cut/ratings.txt does not exist Mohammed -Original Message- From: riginos [mailto:samarasrigi...@gmail.com] Sent: Tuesday, January 27, 2015 4:24 PM To: user@spark.apache.org Subject: Running a script on scala-shell on Spark Standalone Cluster I configured 4 pcs with spark-1.2.0-bin-hadoop2.4.tgz with Spark Standalone on a Cluster. On master pc i executed: ./sbin/start-master.sh ./sbin/start-slaves.sh (4 instances) On datanode1 ,datanode2, secondarymaster pcs i executed: ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077 so when i go to https://localhost:8080 i see this: http://apache-spark-user-list.1001560.n3.nabble.com/file/n21396/Screen_Shot_2015-01-28_at_2.png But when i run a script i see errors like those below and i don't what those errors exist: - :load Recommender/Recommender.scala Loading Recommender/Recommender.scala... import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating import java.io._ import scala.util.control.Breaks._ import scala.collection.mutable.ListBuffer data: org.apache.spark.rdd.RDD[String] = data/cut/ratings.txt MappedRDD[819] at textFile at console:51 users: org.apache.spark.rdd.RDD[String] = data/cut/user.txt MappedRDD[821] at textFile at console:51 ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MappedRDD[822] at map at console:55 rank: Int = 10 numIterations: Int = 20 15/01/28 02:13:18 ERROR TaskSetManager: Task 1 in stage 105.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 105.0 failed 4 times, most recent failure: Lost task 1.3 in stage 105.0 (TID 212, datanode2): java.io.FileNotFoundException: File file:/home/sparkuser/spark-1.2.0/spark-1.2.0-bin-hadoop2.4/data/cut/ratings.txt does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.init(ChecksumFileSystem.java:137) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:764) at org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:108) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:233) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:210) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
Re: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Hi, Jim Your generated rdd should be the type of RDD[ImmutableBytesWritable, KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put]. You can go like this and the result should be type of RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile val result = num.flatMap ( v= { keyValueBuilder(v).map(v = (v,1)) }).map(v = ( new ImmutableBytesWritable(v._1.getBuffer(), v._1.getRowOffset(), v._1.getRowLength()),v._1)) where keyValueBuider would be defined as RDD[T] = RDD[List[KeyValue]], for example, you can go: val keyValueBuilder = (data: (Int, Int)) ={ val rowkeyBytes = Bytes.toBytes(data._1) val colfam = Bytes.toBytes(cf) val qual = Bytes.toBytes(c1) val value = Bytes.toBytes(val_xxx) val kv = new KeyValue(rowkeyBytes,colfam,qual,value) List(kv) } Thanks, Sun fightf...@163.com From: Jim Green Date: 2015-01-28 04:44 To: Ted Yu CC: user Subject: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile I used below code, and it still failed with the same error. Anyone has experience on bulk loading using scala? Thanks. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) }) rdd.saveAsNewAPIHadoopFile(/tmp/13, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) On Tue, Jan 27, 2015 at 12:17 PM, Jim Green openkbi...@gmail.com wrote: Thanks Ted. Could you give me a simple example to load one row data in hbase? How should I generate the KeyValue? I tried multiple times, and still can not figure it out. On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu yuzhih...@gmail.com wrote: Here is the method signature used by HFileOutputFormat : public void write(ImmutableBytesWritable row, KeyValue kv) Meaning, KeyValue is expected, not Put. On Tue, Jan 27, 2015 at 10:54 AM, Jim Green openkbi...@gmail.com wrote: Hi Team, I need some help on writing a scala to bulk load some data into hbase. Env: hbase 0.94 spark-1.0.2 I am trying below code to just bulk load some data into hbase table “t1”. import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat val conf = HBaseConfiguration.create() val tableName = t1 val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) val num = sc.parallelize(1 to 10) val rdd = num.map(x={ val put: Put = new Put(Bytes.toBytes(x)) put.add(cf.getBytes(), c1.getBytes(), (value_xxx).getBytes()) (new ImmutableBytesWritable(Bytes.toBytes(x)), put) })
Re: Spark on Yarn: java.lang.IllegalArgumentException: Invalid rule
Thanks, Ted. Kerberos is enabled on the cluster. I'm new to the world of kerberos, so pease excuse my ignorance here. Do you know if there are any additional steps I need to take in addition to setting HADOOP_CONF_DIR? For instance, does hadoop.security.auth_to_local require any specific setting (the current setting for this property was set by the admin)? This cluster has Spark 1.0 installed and I can use it without any errors. On Jan 26, 2015 11:38 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like Kerberos was enabled for your cluster. Can you check the config files under HADOOP_CONF_DIR ? Cheers On Mon, Jan 26, 2015 at 8:17 PM, maven niranja...@gmail.com wrote: All, I recently try to build Spark-1.2 on my enterprise server (which has Hadoop 2.3 with YARN). Here're the steps I followed for the build: $ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package $ export SPARK_HOME=/path/to/spark/folder $ export HADOOP_CONF_DIR=/etc/hadoop/conf However, when I try to work with this installation either locally or on YARN, I get the following error: Exception in thread main java.lang.ExceptionInInitializerError at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:232) at water.MyDriver$.main(MyDriver.scala:19) at water.MyDriver.main(MyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Unable to load YARN support at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:199) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:194) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) ... 15 more Caused by: java.lang.IllegalArgumentException: Invalid rule: L RULE:[2:$1@$0](.*@XXXCOMPANY.COM)s/(.*)@XXXCOMPANY.COM/$1/L DEFAULT at org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321) at org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386) at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.init(YarnSparkHadoopUtil.scala:45) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:196) ... 17 more I noticed that when I unset HADOOP_CONF_DIR, I'm able to work in the local mode without any errors. I'm able to work with pre-installed Spark 1.0, locally and on yarn, without any issues. It looks like I may be missing a configuration step somewhere. Any thoughts on what may be causing this? NR -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-java-lang-IllegalArgumentException-Invalid-rule-tp21382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn: java.lang.IllegalArgumentException: Invalid rule
Thanks, Siddardha. I did but got the same error. Kerberos is enabled on my cluster and I may be missing a configuration step somewhere. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-java-lang-IllegalArgumentException-Invalid-rule-tp21382p21392.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.2 ec2 launch script hang
Absolute path means no ~ and also verify that you have the path to the file correct. For some reason the Python code does not validate that the file exists and will hang (this is the same reason why ~ hangs). On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote: Try using an absolute path to the pem file On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the spark-ec2 script of spark 1.2 to launch a cluster. I have modified the script according to https://github.com/grzegorz-dubicki/spark/commit/ 5dd8458d2ab9753aae939b3bb33be953e2c13a70 But the script was still hung at the following message: Waiting for cluster to enter 'ssh-ready' state. Any additional thing I should do to make it succeed? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi Anthony, What is the setting of the total amount of memory in MB that can be allocated to containers on your NodeManagers? yarn.nodemanager.resource.memory-mb Can you check this above configuration in yarn-site.xml used by the node manager process? -Guru Medasani From: Sandy Ryza sandy.r...@cloudera.com Date: Tuesday, January 27, 2015 at 3:33 PM To: Antony Mayi antonym...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Hi Antony, If you look in the YARN NodeManager logs, do you see that it's killing the executors? Or are they crashing for a different reason? -Sandy On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used: spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas, Antony.
SparkSQL Performance Tuning Options
Spark 1.2, no Hive, prefer not to use HiveContext to avoid metastore_db. Use case is Spark Yarn app will start and serve as query server for multiple users i.e. always up and running. At startup, there is option to cache data and also pre-compute some results sets, hash maps etc. that would be likely be asked by client APIs. I.e there is some option to use startup time to precompute/cache - but query response time requirement on large data set is very stringent Hoping to use SparkSQL (but a combination of SQL and RDD APIs is also OK). * Does SparkSQL execution uses underlying partition information ? (Data is from HDFS) * Are there any ways to give hints to the SparkSQL execution about any precomputed/pre-cached RDDs? * Packages spark.sql.execution, spark.sql.execution.joins and other sql.xxx packages - would using these for tuning query plan is recommended? Would like to keep this as-needed if possible * Features not in current release but scheduled for upcoming release would also be good to know. Thanks, PS: This is not a small topic so if someone prefers to start a offline thread on details, I can do that and summarize the conclusions back to this thread.
Spark 1.2.x Yarn Auxiliary Shuffle Service
I've read that this is supposed to be a rather significant optimization to the shuffle system in 1.1.0 but I'm not seeing much documentation on enabling this in Yarn. I see github classes for it in 1.2.0 and a property spark.shuffle.service.enabled in the spark-defaults.conf. The code mentions that this is supposed to be run inside the Nodemanager so I'm assuming it needs to be wired up in the yarn-site.xml under the yarn.nodemanager.aux-services property?
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Since it's an executor running OOM it doesn't look like a container being killed by YARN to me. As a starting point, can you repartition your job into smaller tasks? -Sven On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani gdm...@outlook.com wrote: Hi Anthony, What is the setting of the total amount of memory in MB that can be allocated to containers on your NodeManagers? yarn.nodemanager.resource.memory-mb Can you check this above configuration in yarn-site.xml used by the node manager process? -Guru Medasani From: Sandy Ryza sandy.r...@cloudera.com Date: Tuesday, January 27, 2015 at 3:33 PM To: Antony Mayi antonym...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Hi Antony, If you look in the YARN NodeManager logs, do you see that it's killing the executors? Or are they crashing for a different reason? -Sandy On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used: spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas, Antony. -- http://sites.google.com/site/krasser/?utm_source=sig
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Can you attach the logs where this is failing? From: Sven Krasser kras...@gmail.com Date: Tuesday, January 27, 2015 at 4:50 PM To: Guru Medasani gdm...@outlook.com Cc: Sandy Ryza sandy.r...@cloudera.com, Antony Mayi antonym...@yahoo.com, user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Since it's an executor running OOM it doesn't look like a container being killed by YARN to me. As a starting point, can you repartition your job into smaller tasks? -Sven On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani gdm...@outlook.com wrote: Hi Anthony, What is the setting of the total amount of memory in MB that can be allocated to containers on your NodeManagers? yarn.nodemanager.resource.memory-mb Can you check this above configuration in yarn-site.xml used by the node manager process? -Guru Medasani From: Sandy Ryza sandy.r...@cloudera.com Date: Tuesday, January 27, 2015 at 3:33 PM To: Antony Mayi antonym...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Hi Antony, If you look in the YARN NodeManager logs, do you see that it's killing the executors? Or are they crashing for a different reason? -Sandy On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used: spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas, Antony. -- http://sites.google.com/site/krasser/?utm_source=sig
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
I have yarn configured with yarn.nodemanager.vmem-check-enabled=false and yarn.nodemanager.pmem-check-enabled=false to avoid yarn killing the containers. the stack trace is bellow. thanks,Antony. 15/01/27 17:02:53 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM15/01/27 17:02:53 ERROR executor.Executor: Exception in task 21.0 in stage 12.0 (TID 1312)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Integer.valueOf(Integer.java:642) at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:70) at scala.collection.mutable.ArrayOps$ofInt.apply(ArrayOps.scala:156) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156) at scala.collection.SeqLike$class.distinct(SeqLike.scala:493) at scala.collection.mutable.ArrayOps$ofInt.distinct(ArrayOps.scala:156) at org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock(ALS.scala:404) at org.apache.spark.mllib.recommendation.ALS$$anonfun$15.apply(ALS.scala:459) at org.apache.spark.mllib.recommendation.ALS$$anonfun$15.apply(ALS.scala:456) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:614) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:614) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)15/01/27 17:02:53 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-8,5,main]java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Integer.valueOf(Integer.java:642) at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:70) at scala.collection.mutable.ArrayOps$ofInt.apply(ArrayOps.scala:156) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156) at scala.collection.SeqLike$class.distinct(SeqLike.scala:493) at scala.collection.mutable.ArrayOps$ofInt.distinct(ArrayOps.scala:156) at org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock(ALS.scala:404) at org.apache.spark.mllib.recommendation.ALS$$anonfun$15.apply(ALS.scala:459) at org.apache.spark.mllib.recommendation.ALS$$anonfun$15.apply(ALS.scala:456) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:614) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:614) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at
Index wise most frequently occuring element
I have a an array of the form val array: Array[(Int, (String, Int))] = Array( (idx1,(word1,count1)), (idx2,(word2,count2)), (idx1,(word1,count1)), (idx3,(word3,count1)), (idx4,(word4,count4))) I want to get the top 10 and bottom 10 elements from this array for each index (idx1,idx2,). Basically I want the top 10 most occuring and bottom 10 least occuring elements for each index value. Please suggest how to acheive in spark in most efficient way. I have tried it using the for loops for each index but this makes the program too slow and runs sequentially. Thanks, Kundan
how to split key from RDD for compute UV
Hello All, I am writing a simple Spark application to count UV(unique view) from a log file。Below is my code,it is not right on the red line .My idea here is same cookie on a host only count one .So i want to split the host from the previous RDD. But now I don't know how to finish it .Any suggestion will be appreciate! val url_index = args(1).toIntval cookie_index = args(2).toIntval textRDD = sc.textFile(args(0)) .map(_.split(\t)) .map(line = ((new java.net.URL(line(url_index)).getHost) + \t + line(cookie_index),1)) .reduceByKey(_ + _) .map(line = (line.split(\t)(0),1)) .reduceByKey(_ + _) .map(item = item.swap) .sortByKey(false) .map(item = item.swap)
Best way to shut down a stream initiated from the recevier
Hi all, we are building a custom JDBC receiver that would create a stream from sql tables. Not sure what is the best way to shut down the stream once all the data goes through, as the receiver knows it is completed but it cannot initiate the stream to shut down. Any suggestion how to structure this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-shut-down-a-stream-initiated-from-the-recevier-tp21390.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues when combining Spark and a third party java library
Okay, I finally tried to change the Hadoop-client version from 2.4.0 to 2.5.2 and that mysteriously fixed everything.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p21387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Aha, you’re right, I did a wrong comparison, the reason might be only for checkpointing :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:39 AM To: Shao, Saisai Cc: user Subject: Re: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
Spark on Windows 2008 R2 serv er does not work
I download and install spark-1.2.0-bin-hadoop2.4.tgz pre-built version on Windows 2008 R2 server. When I submit a job using spark-submit, I got the following error WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform ... using builtin-java classes where applicable ERROR org.apache.hadoop.util.Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) Please advise. Thanks. Ningjun
Re: Error reporting/collecting for users
It is a Streaming application, so how/when do you plan to access the accumulator on driver? On Tue, Jan 27, 2015 at 6:48 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, thanks for your mail! On Wed, Jan 28, 2015 at 11:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: That seems reasonable to me. Are you having any problems doing it this way? Well, actually I haven't done that yet. The idea of using accumulators to collect errors just came while writing the email, but I thought I'd just keep writing and see if anyone has any other suggestions ;-) Thanks Tobias
Re: Partition + equivalent of MapReduce multiple outputs
I wanted to update this thread for others who may be looking for a solution to his as well. I found [1] and I'm going to investigate if this is a viable solution. [1] http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job On Wed, Jan 28, 2015 at 12:51 AM, Corey Nolet cjno...@gmail.com wrote: I need to be able to take an input RDD[Map[String,Any]] and split it into several different RDDs based on some partitionable piece of the key (groups) and then send each partition to a separate set of files in different folders in HDFS. 1) Would running the RDD through a custom partitioner be the best way to go about this or should I split the RDD into different RDDs and call saveAsHadoopFile() on each? 2) I need the resulting partitions sorted by key- they also need to be written to the underlying files in sorted order. 3) The number of keys in each partition will almost always be too big to fit into memory. Thanks.
Re: SparkSQL Performance Tuning Options
On 1/27/15 5:55 PM, Cheng Lian wrote: On 1/27/15 11:38 AM, Manoj Samel wrote: Spark 1.2, no Hive, prefer not to use HiveContext to avoid metastore_db. Use case is Spark Yarn app will start and serve as query server for multiple users i.e. always up and running. At startup, there is option to cache data and also pre-compute some results sets, hash maps etc. that would be likely be asked by client APIs. I.e there is some option to use startup time to precompute/cache - but query response time requirement on large data set is very stringent Hoping to use SparkSQL (but a combination of SQL and RDD APIs is also OK). * Does SparkSQL execution uses underlying partition information ? (Data is from HDFS) No. For example, if the underlying data has already been partitioned by some key, Spark SQL doesn't know it, and can't leverage that information to avoid shuffle when doing aggregation on that key. However, partitioning the data ahead of time does help minimizing shuffle network IO. There's a JIRA ticket to enable Spark SQL aware of underlying data distribution. Maybe you are asking about locality? If that's the case, just want to add that Spark SQL does understand locality information of the underlying data. It's obtained from Hadoop InputFormat. * Are there any ways to give hints to the SparkSQL execution about any precomputed/pre-cached RDDs? Instead of caching raw RDD, it's recommended to transform raw RDD to SchemaRDD and then cache it, so that in-memory columnar storage can be used. Also Spark SQL recognizes cached SchemaRDDs automatically. * Packages spark.sql.execution, spark.sql.execution.joins and other sql.xxx packages - would using these for tuning query plan is recommended? Would like to keep this as-needed if possible Not sure whether I understood this question. Are you trying to use internal APIs to do customized optimizations? * Features not in current release but scheduled for upcoming release would also be good to know. Thanks, PS: This is not a small topic so if someone prefers to start a offline thread on details, I can do that and summarize the conclusions back to this thread. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit conflicts with dependencies
I have the same problem too. org.apache.hadoop:hadoop-common:jar:2.4.0 brings commons-configuration:commons-configuration:jar:1.6 but we're using commons-configuration:commons 1.8 Is there any workaround for this? Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-conflicts-with-dependencies-tp8909p21399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error reporting/collecting for users
Hi, On Wed, Jan 28, 2015 at 1:45 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: It is a Streaming application, so how/when do you plan to access the accumulator on driver? Well... maybe there would be some user command or web interface showing the errors that have happened during processing...? Thanks Tobias
Re: spark-submit conflicts with dependencies
You can add exclusion to Spark pom.xml Here is an example (for hadoop-client which brings in hadoop-common) diff --git a/pom.xml b/pom.xml index 05cb379..53947d9 100644 --- a/pom.xml +++ b/pom.xml @@ -632,6 +632,10 @@ scope${hadoop.deps.scope}/scope exclusions exclusion +groupIdcommons-configuration/groupId +artifactIdcommons-configuration/artifactId + /exclusion + exclusion groupIdasm/groupId artifactIdasm/artifactId /exclusion On Tue, Jan 27, 2015 at 9:33 PM, soid s...@dicefield.com wrote: I have the same problem too. org.apache.hadoop:hadoop-common:jar:2.4.0 brings commons-configuration:commons-configuration:jar:1.6 but we're using commons-configuration:commons 1.8 Is there any workaround for this? Greg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-conflicts-with-dependencies-tp8909p21399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Partition + equivalent of MapReduce multiple outputs
I need to be able to take an input RDD[Map[String,Any]] and split it into several different RDDs based on some partitionable piece of the key (groups) and then send each partition to a separate set of files in different folders in HDFS. 1) Would running the RDD through a custom partitioner be the best way to go about this or should I split the RDD into different RDDs and call saveAsHadoopFile() on each? 2) I need the resulting partitions sorted by key- they also need to be written to the underlying files in sorted order. 3) The number of keys in each partition will almost always be too big to fit into memory. Thanks.
Storing DecisionTreeModel
Hi everyone, Is there a way to save on disk the model to reuse it later? I could serialize the object and save the bytes, but I'm guessing there might be a better way to do so. Has anyone tried that? Andres. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Storing-DecisionTreeModel-tp21393.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
NegativeArraySizeException in pyspark when loading an RDD pickleFile
I've got an dataset saved with saveAsPickleFile using pyspark -- it saves without problems. When I try to read it back in, it fails with: Job aborted due to stage failure: Task 401 in stage 0.0 failed 4 times, most recent failure: Lost task 401.3 in stage 0.0 (TID 449, e1326.hpc-lca.ethz.ch): java.lang.NegativeArraySizeException: org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:119) org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:98) org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:153) org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1875) org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1848) org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103) org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219) org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:330) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184) org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) Not really sure where to start looking for the culprit -- any suggestions most welcome. Thanks! Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-in-pyspark-when-loading-an-RDD-pickleFile-tp21395.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to split key from RDD for compute UV
Hi, Did you try asking this on StackOverflow? http://stackoverflow.com/questions/tagged/apache-spark I'd also suggest adding some sample data to help others understanding your logic. -kr, Gerard. On Tue, Jan 27, 2015 at 1:14 PM, 老赵 laozh...@sina.cn wrote: Hello All, I am writing a simple Spark application to count UV(unique view) from a log file。 Below is my code,it is not right on the red line . My idea here is same cookie on a host only count one .So i want to split the host from the previous RDD. But now I don't know how to finish it . Any suggestion will be appreciate! val url_index = args(1).toInt val cookie_index = args(2).toInt val textRDD = sc.textFile(args(0)) .map(_.split(\t)) .map(line = ((new java.net.URL(line(url_index)).getHost) + \t + line(cookie_index),1)) .reduceByKey(_ + _) .map(line = (line.split(\t)(0),1)) .reduceByKey(_ + _) .map(item = item.swap) .sortByKey(false) .map(item = item.swap)
Re: Spark on Yarn: java.lang.IllegalArgumentException: Invalid rule
Caused by: java.lang.IllegalArgumentException: Invalid rule: L RULE:[2:$1@$0](.*@XXXCOMPANY.COM http://xxxcompany.com/)s/(.*)@ XXXCOMPANY.COM/$1/L http://xxxcompany.com/$1/L DEFAULT Can you put the rule on a single line (not sure whether there is newline or space between L and DEFAULT) ? Looks like the characters between the last slash and DEFAULT are extraneous. Cheers On Tue, Jan 27, 2015 at 7:21 AM, Niranjan Reddy niranja...@gmail.com wrote: Thanks, Ted. Kerberos is enabled on the cluster. I'm new to the world of kerberos, so pease excuse my ignorance here. Do you know if there are any additional steps I need to take in addition to setting HADOOP_CONF_DIR? For instance, does hadoop.security.auth_to_local require any specific setting (the current setting for this property was set by the admin)? This cluster has Spark 1.0 installed and I can use it without any errors. On Jan 26, 2015 11:38 PM, Ted Yu yuzhih...@gmail.com wrote: Looks like Kerberos was enabled for your cluster. Can you check the config files under HADOOP_CONF_DIR ? Cheers On Mon, Jan 26, 2015 at 8:17 PM, maven niranja...@gmail.com wrote: All, I recently try to build Spark-1.2 on my enterprise server (which has Hadoop 2.3 with YARN). Here're the steps I followed for the build: $ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package $ export SPARK_HOME=/path/to/spark/folder $ export HADOOP_CONF_DIR=/etc/hadoop/conf However, when I try to work with this installation either locally or on YARN, I get the following error: Exception in thread main java.lang.ExceptionInInitializerError at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1784) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:292) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:232) at water.MyDriver$.main(MyDriver.scala:19) at water.MyDriver.main(MyDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Unable to load YARN support at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:199) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala:194) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.scala) ... 15 more Caused by: java.lang.IllegalArgumentException: Invalid rule: L RULE:[2:$1@$0](.*@XXXCOMPANY.COM)s/(.*)@XXXCOMPANY.COM/$1/L DEFAULT at org.apache.hadoop.security.authentication.util.KerberosName.parseRules(KerberosName.java:321) at org.apache.hadoop.security.authentication.util.KerberosName.setRules(KerberosName.java:386) at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:75) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:247) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala:43) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.init(YarnSparkHadoopUtil.scala:45) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:196) ... 17 more I noticed that when I unset HADOOP_CONF_DIR, I'm able to work in the local mode without any errors. I'm able to work with pre-installed Spark 1.0, locally and on yarn, without any issues. It looks like I may be missing a configuration step somewhere. Any thoughts on what may be causing this? NR -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-java-lang-IllegalArgumentException-Invalid-rule-tp21382.html Sent from the Apache Spark
Re: saving rdd to multiple files named by the key
There is also SPARK-3533 https://issues.apache.org/jira/browse/SPARK-3533, which proposes to add a convenience method for this. On Mon Jan 26 2015 at 10:38:56 PM Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This might be helpful: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job On Tue Jan 27 2015 at 07:45:18 Sharon Rapoport sha...@plaid.com wrote: Hi, I have an rdd of [k,v] pairs. I want to save each [v] to a file named [k]. I got them by combining many [k,v] by [k]. I could then save to file by partitions, but that still doesn't allow me to choose the name, and leaves me stuck with foo/part-... Any tips? Thanks, Sharon
Re: spark 1.2 ec2 launch script hang
For those who found that absolute vs. relative path for the pem file mattered, what OS and shell are you using? What version of Spark are you using? ~/ vs. absolute path shouldn’t matter. Your shell will expand the ~/ to the absolute path before sending it to spark-ec2. (i.e. tilde expansion.) Absolute vs. relative path (e.g. ../../path/to/pem) also shouldn’t matter, since we fixed that for Spark 1.2.0 https://issues.apache.org/jira/browse/SPARK-4137. Maybe there’s some case that we missed? Nick On Tue Jan 27 2015 at 10:10:29 AM Charles Feduke charles.fed...@gmail.com wrote: Absolute path means no ~ and also verify that you have the path to the file correct. For some reason the Python code does not validate that the file exists and will hang (this is the same reason why ~ hangs). On Mon, Jan 26, 2015 at 10:08 PM Pete Zybrick pzybr...@gmail.com wrote: Try using an absolute path to the pem file On Jan 26, 2015, at 8:57 PM, ey-chih chow eyc...@hotmail.com wrote: Hi, I used the spark-ec2 script of spark 1.2 to launch a cluster. I have modified the script according to https://github.com/grzegorz-dubicki/spark/commit/5dd8458d2ab 9753aae939b3bb33be953e2c13a70 But the script was still hung at the following message: Waiting for cluster to enter 'ssh-ready' state. Any additional thing I should do to make it succeed? Thanks. Ey-Chih Chow -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/spark-1-2-ec2-launch-script-hang-tp21381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[documentation] Update the python example ALS of the site?
Hi, In the spark 1.2.0, it requires the ratings should be a RDD of Rating or tuple or list. However, the current example in the site use still RDD[array] as the ratings. Therefore, the example doesn't work under the version 1.2.0. May be we should update the documentation of the site? Thanks a lot. Cheers Gen
Re: Mathematical functions in spark sql
Hey Alexey, You need to use |HiveContext| in order to access Hive UDFs. You may try it with |bin/spark-sql| (|src| is a Hive table): |spark-sql select key / 3 from src limit 10; 79.33 28.668 103.67 9.0 55.0 136.34 85.0 92.67 32.664 161.34 spark-sql select ceil(key / 3) from src limit 10; 80 29 104 9 55 137 85 93 33 162 | Just put your |hive-site.xml| under |$SPARK_HOME/conf|. Cheng On 1/26/15 11:20 PM, Alexey Romanchuk wrote: I have tried select ceil(2/3), but got key not found: floor On Tue, Jan 27, 2015 at 11:05 AM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: Have you tried floor() or ceil() functions ? According to http://spark.apache.org/sql/, Spark SQL is compatible with Hive SQL. Cheers On Mon, Jan 26, 2015 at 8:29 PM, 1esha alexey.romanc...@gmail.com mailto:alexey.romanc...@gmail.com wrote: Hello everyone! I try execute select 2/3 and I get 0.. Is there any way to cast double to int or something similar? Also it will be cool to get list of functions supported by spark sql. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mathematical-functions-in-spark-sql-tp21383.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
joins way slow never go to completion
I have about 15 -20 joins to perform. Each of these tables are in the order of 6 million to 66 million rows. The number of columns range from 20 are 400. I read the parquet files and obtain schemaRDDs. Then use join functionality on 2 SchemaRDDs. I join the previous join results with the next schemaRDD. Any ideas how to deal with such join intensive spark SQL process? Any advise how to handle joins in better ways? I will appreciate all the inputs. Thanks!
LeftOuter Join issue
I have about 15 -20 joins to perform. Each of these tables are in the order of 6 million to 66 million rows. The number of columns range from 20 are 400. I read the parquet files and obtain schemaRDDs. Then use join functionality on 2 SchemaRDDs. I join the previous join results with the next schemaRDD. Any ideas how to deal with such join intensive spark SQL process? Any advise how to handle joins in better ways? I will appreciate all the inputs. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LeftOuter-Join-issue-tp21398.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: performance of saveAsTextFile moving files from _temporary
This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
I believe this is needed for driver recovery in Spark Streaming. If your Spark driver program crashes, Spark Streaming can recover the application by reading the set of DStreams and output operations from a checkpoint file (see https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing). But to do that, it needs to remember all the operations you're running periodically, including those in foreachRDD. Matei On Jan 27, 2015, at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I want to do something like dstream.foreachRDD(rdd = if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Why must the dstream.foreachRDD(...) parameter be serializable?
Hey Tobias, I think one consideration is for checkpoint of DStream which guarantee driver fault tolerance. Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Thanks Jerry From: Tobias Pfeiffer [mailto:t...@preferred.jp] Sent: Wednesday, January 28, 2015 10:16 AM To: user Subject: Why must the dstream.foreachRDD(...) parameter be serializable? Hi, I want to do something like dstream.foreachRDD(rdd = if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias
Re: Error reporting/collecting for users
Hi, thanks for your mail! On Wed, Jan 28, 2015 at 11:44 AM, Tathagata Das tathagata.das1...@gmail.com wrote: That seems reasonable to me. Are you having any problems doing it this way? Well, actually I haven't done that yet. The idea of using accumulators to collect errors just came while writing the email, but I thought I'd just keep writing and see if anyone has any other suggestions ;-) Thanks Tobias
Re: performance of saveAsTextFile moving files from _temporary
I'm not sure how to confirm how the moving is happening, however, one of the jobs just completed that I was talking about with 9k files of 4mb each. Spark UI showed the job being complete after ~2 hours. The last four hours of the job was just moving the files from _temporary to their final destination. The tasks for the write were definitely shown as complete, no logging is happening on the master or workers. The last line of my java code logs, but the job sits there as the moving of files happens. On Tue, Jan 27, 2015 at 7:24 PM, Aaron Davidson ilike...@gmail.com wrote: This renaming from _temporary to the final location is actually done by executors, in parallel, for saveAsTextFile. It should be performed by each task individually before it returns. I have seen an issue similar to what you mention dealing with Hive code which did the renaming serially on the driver, which is very slow for S3 (and possibly Google Storage as well), as it actually copies the data rather than doing a metadata-only operation during rename. However, this should not be an issue in this case. Could you confirm how the moving is happening -- i.e., on the executors or the driver? On Tue, Jan 27, 2015 at 4:31 PM, jwalton j...@openbookben.com wrote: We are running spark in Google Compute Engine using their One-Click Deploy. By doing so, we get their Google Cloud Storage connector for hadoop for free meaning we can specify gs:// paths for input and output. We have jobs that take a couple of hours, end up with ~9k partitions which means 9k output files. After the job is complete it then moves the output files from our $output_path/_temporary to $output_path. That process can take longer than the job itself depending on the circumstances. The job I mentioned previously outputs ~4mb files, and so far has copied 1/3 of the files in 1.5 hours from _temporary to the final destination. Is there a solution to this besides reducing the number of partitions? Anyone else run into similar issues elsewhere? I don't remember this being an issue with Map Reduce jobs and hadoop, however, I probably wasn't tracking the transfer of the output files like I am with Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/performance-of-saveAsTextFile-moving-files-from-temporary-tp21397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, I want to do something like dstream.foreachRDD(rdd = if (someCondition) ssc.stop()) so in particular the function does not touch any element in the RDD and runs completely within the driver. However, this fails with a NotSerializableException because $outer is not serializable etc. The DStream code says: def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } To be honest, I don't understand the comment. Why must that function be serializable even when there is no RDD action involved? Thanks Tobias
Re: Why must the dstream.foreachRDD(...) parameter be serializable?
Hi, thanks for the answers! On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai saisai.s...@intel.com wrote: Also this `foreachFunc` is more like an action function of RDD, thinking of rdd.foreach(func), in which `func` need to be serializable. So maybe I think your way of use it is not a normal way :). Yeah I totally understand why func in rdd.foreach(func) must be serializable (because it's sent to the executors), but I didn't get why a function that's not shipped around must be serializable, too. The explanations made sense, though :-) Thanks Tobias
RE: How to start spark master on windows
Never mind, the problem is that JAVA is not installed on windows. I install JAVA and the problem go away. Regards, Ningjun Wang Consulting Software Engineer LexisNexis 121 Chanlon Road New Providence, NJ 07974-1541 From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Tuesday, January 27, 2015 4:38 PM To: user@spark.apache.org Subject: How to start spark master on windows I download spark 1.2.0 on my windows server 2008. How do I start spark master? I tried to run the following on command prompt C:\spark-1.2.0-bin-hadoop2.4 bin\spark-class.cmd org.apache.spark.deploy.master.Master I got the error else was unexpected at this time. Ningjun
Re: spark sqlContext udaf
UDAF is a WIP, at least from API user's perspective as there is no public API to my knowledge. https://issues.apache.org/jira/browse/SPARK-3947 Thanks On Tue, Jan 27, 2015 at 12:26 PM, sunwei hisun...@outlook.com wrote: Hi, any one can show me some examples using UDAF for spark sqlcontext?