Re: Spark runs into an Infinite loop even if the tasks are completed successfully
Yep, and it works fine for operations which does not involve any shuffle (like foreach,, count etc) and those which involves shuffle operations ends up in an infinite loop. Spark should somehow indicate this instead of going in an infinite loop. Thanks Best Regards On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid iras...@cloudera.com wrote: oh I see, you are defining your own RDD Partition types, and you had a bug where partition.index did not line up with the partitions slot in rdd.getPartitions. Is that correct? On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I figured that out, And these are my findings: - It just enters in an infinite loop when there's a duplicate partition id. - It enters in an infinite loop when the partition id starts from 1 rather than 0 Something like this piece of code can reproduce it: (in getPartitions()) val total_partitions = 4 val partitionsArray: Array[Partition] = Array.ofDim[Partition](total_partitions) var i = 0 for(outer - 0 to 1){ for(partition - 1 to total_partitions){ partitionsArray(i) = new DeadLockPartitions(partition) i = i + 1 } } partitionsArray Thanks Best Regards On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid iras...@cloudera.com wrote: yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA https://issues.apache.org/jira/browse/SPARK-583 (which is very very old) Thanks Best Regards
Re: Spark runs into an Infinite loop even if the tasks are completed successfully
What I understood from Imran's mail (and what was referenced in his mail) the RDD mentioned seems to be violating some basic contracts on how partitions are used in spark [1]. They cannot be arbitrarily numbered,have duplicates, etc. Extending RDD to add functionality is typically for niche cases; and requires subclasses to adhere to the explicit (and implicit) contracts/lifecycles for them. Using existing RDD's as template would be a good idea for customizations - one way to look at it is, using RDD is more in api space but extending them is more in spi space. Violations would actually not even be detectable by spark-core in general case. Regards, Mridul [1] Ignoring the array out of bounds, etc - I am assuming the intent is to show overlapping partitions, duplicates. index to partition mismatch - that sort of thing. On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, and it works fine for operations which does not involve any shuffle (like foreach,, count etc) and those which involves shuffle operations ends up in an infinite loop. Spark should somehow indicate this instead of going in an infinite loop. Thanks Best Regards On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid iras...@cloudera.com wrote: oh I see, you are defining your own RDD Partition types, and you had a bug where partition.index did not line up with the partitions slot in rdd.getPartitions. Is that correct? On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I figured that out, And these are my findings: - It just enters in an infinite loop when there's a duplicate partition id. - It enters in an infinite loop when the partition id starts from 1 rather than 0 Something like this piece of code can reproduce it: (in getPartitions()) val total_partitions = 4 val partitionsArray: Array[Partition] = Array.ofDim[Partition](total_partitions) var i = 0 for(outer - 0 to 1){ for(partition - 1 to total_partitions){ partitionsArray(i) = new DeadLockPartitions(partition) i = i + 1 } } partitionsArray Thanks Best Regards On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid iras...@cloudera.com wrote: yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA (which is very very old) Thanks Best Regards - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: please help with ClassNotFoundException
Hi, Sea Problem solved, it turn out to be that I have updated spark cluster to 1.4.1, however the client has not been updated. Thank you so much. Sea 261810...@qq.com于2015年8月14日周五 下午1:01写道: I have no idea... We use scala. You upgrade to 1.4 so quickly..., are you using spark in production? Spark 1.3 is better than spark1.4. -- 原始邮件 -- *发件人:* 周千昊;z.qian...@gmail.com; *发送时间:* 2015年8月14日(星期五) 中午11:14 *收件人:* Sea261810...@qq.com; dev@spark.apache.org dev@spark.apache.org; *主题:* Re: please help with ClassNotFoundException Hi Sea I have updated spark to 1.4.1, however the problem still exists, any idea? Sea 261810...@qq.com于2015年8月14日周五 上午12:36写道: Yes, I guess so. I see this bug before. -- 原始邮件 -- *发件人:* 周千昊;z.qian...@gmail.com; *发送时间:* 2015年8月13日(星期四) 晚上9:30 *收件人:* Sea261810...@qq.com; dev@spark.apache.org dev@spark.apache.org; *主题:* Re: please help with ClassNotFoundException Hi sea Is it the same issue as https://issues.apache.org/jira/browse/SPARK-8368 Sea 261810...@qq.com于2015年8月13日周四 下午6:52写道: Are you using 1.4.0? If yes, use 1.4.1 -- 原始邮件 -- *发件人:* 周千昊;qhz...@apache.org; *发送时间:* 2015年8月13日(星期四) 晚上6:04 *收件人:* devdev@spark.apache.org; *主题:* please help with ClassNotFoundException Hi, I am using spark 1.4 when an issue occurs to me. I am trying to use the aggregate function: JavaRddString rdd = some rdd; HashMapLong, TypeA zeroValue = new HashMap(); // add initial key-value pair for zeroValue rdd.aggregate(zeroValue, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}, new Function2HashMapLong, TypeA, String, HashMapLong, TypeA(){//implementation}) here is the stack trace when i run the application: Caused by: java.lang.ClassNotFoundException: TypeA at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89) at org.apache.spark.util.Utils$.clone(Utils.scala:1458) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047) at org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413) at org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47) *however I have checked that TypeA is in the jar file which is in the classpath* *And when I use an empty HashMap as the zeroValue, the exception has gone* *Does anyone meet the same problem, or can anyone help me with it?* -- Best Regard ZhouQianhao -- Best Regard ZhouQianhao
Introduce a sbt plugin to deploy and submit jobs to a spark cluster on ec2
Hello, I have written a sbt plugin called spark-deployer, which is able to deploy a standalone spark cluster on aws ec2 and submit jobs to it. https://github.com/pishen/spark-deployer Compared to current spark-ec2 script, this design may have several benefits (features): 1. All the code are written in Scala. 2. Just add one line in your project/plugins.sbt and you are ready to go. (You don't have to download the python code and store it at someplace.) 3. The whole development flow (write code for spark job, compile the code, launch the cluster, assembly and submit the job to master, terminate the cluster when the job is finished) can be done in sbt. 4. Support parallel deployment of the worker machines by Scala's Future. 5. Allow dynamically add or remove worker machines to/from the current cluster. 6. All the configurations are stored in a typesafe config file. You don't need to store it elsewhere and map the settings into spark-ec2's command line arguments. 7. The core library is separated from sbt plugin, hence it's possible to execute the deployment from an environment without sbt (only JVM is required). 8. Support adjustable ec2 root disk size, custom security groups, custom ami (can run on default Amazon ami), custom spark tarball, and VPC. (Well, most of these are also supported in spark-ec2 in slightly different form, just mention it anyway.) Since this project is still in its early stage, it lacks some features of spark-ec2 such as self-installed HDFS (we use s3 directly), stoppable cluster, ganglia, and the copy script. However, it's already usable for our company and we are trying to move our production spark projects from spark-ec2 to spark-deployer. Any suggestion, testing help, or pull request are highly appreicated. On top of that, I would like to contribute this project to Spark, maybe as another choice (suggestion link) alongside spark-ec2 on Spark's official documentation. Of course, before that, I have to make this project stable enough (strange errors just happen on aws api from time to time). I'm wondering if this kind of contribution is possible and is there any rule to follow or anyone to contact? (Maybe the source code will not be merged into spark's main repository, since I've noticed that spark-ec2 is also planning to move out.) Regards, Pishen Tsai - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Fwd: [ANNOUNCE] Spark 1.5.0-preview package
Has anyone had success using this preview? We were able to build the preview, and able to start the spark-master, however, unable to connect any spark workers to it. Kept receiving AkkaRpcEnv address in use while attempting to connect the spark-worker to the master. Also confirmed that the worker was indeed starting and a non-blocking random port as expected, and not dying... just refusing to connect. :/ Built using maven 3.3.3 with jdk 7, on centos 7. We're running a hadoop 2.7.1 cluster in development (where I attempted to run this), and targeting hadoop 2.6 worked fine for 1.4.1 at least, so wondering if that is the cause of anything. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-ANNOUNCE-Spark-1-5-0-preview-package-tp13683p13710.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
avoid creating small objects
Hi, All I want to do is that, 1. read from some source 2. do some calculation to get some byte array 3. write the byte array to hdfs In hadoop, I can share an ImmutableByteWritable, and do some System.arrayCopy, it will prevent the application from creating a lot of small objects which will improve the gc latency. *However I was wondering if there is any solution like above in spark that can avoid creating small objects*
Re: avoid creating small objects
I am thinking that creating a shared object outside the closure, use this object to hold the byte array. will this work? 周千昊 qhz...@apache.org于2015年8月14日周五 下午4:02写道: Hi, All I want to do is that, 1. read from some source 2. do some calculation to get some byte array 3. write the byte array to hdfs In hadoop, I can share an ImmutableByteWritable, and do some System.arrayCopy, it will prevent the application from creating a lot of small objects which will improve the gc latency. *However I was wondering if there is any solution like above in spark that can avoid creating small objects*
Re: Spark runs into an Infinite loop even if the tasks are completed successfully
Thanks for the clarifications Mrithul. Thanks Best Regards On Fri, Aug 14, 2015 at 1:04 PM, Mridul Muralidharan mri...@gmail.com wrote: What I understood from Imran's mail (and what was referenced in his mail) the RDD mentioned seems to be violating some basic contracts on how partitions are used in spark [1]. They cannot be arbitrarily numbered,have duplicates, etc. Extending RDD to add functionality is typically for niche cases; and requires subclasses to adhere to the explicit (and implicit) contracts/lifecycles for them. Using existing RDD's as template would be a good idea for customizations - one way to look at it is, using RDD is more in api space but extending them is more in spi space. Violations would actually not even be detectable by spark-core in general case. Regards, Mridul [1] Ignoring the array out of bounds, etc - I am assuming the intent is to show overlapping partitions, duplicates. index to partition mismatch - that sort of thing. On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, and it works fine for operations which does not involve any shuffle (like foreach,, count etc) and those which involves shuffle operations ends up in an infinite loop. Spark should somehow indicate this instead of going in an infinite loop. Thanks Best Regards On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid iras...@cloudera.com wrote: oh I see, you are defining your own RDD Partition types, and you had a bug where partition.index did not line up with the partitions slot in rdd.getPartitions. Is that correct? On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I figured that out, And these are my findings: - It just enters in an infinite loop when there's a duplicate partition id. - It enters in an infinite loop when the partition id starts from 1 rather than 0 Something like this piece of code can reproduce it: (in getPartitions()) val total_partitions = 4 val partitionsArray: Array[Partition] = Array.ofDim[Partition](total_partitions) var i = 0 for(outer - 0 to 1){ for(partition - 1 to total_partitions){ partitionsArray(i) = new DeadLockPartitions(partition) i = i + 1 } } partitionsArray Thanks Best Regards On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid iras...@cloudera.com wrote: yikes. Was this a one-time thing? Or does it happen consistently? can you turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...) On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi My Spark job (running in local[*] with spark 1.4.1) reads data from a thrift server(Created an RDD, it will compute the partitions in getPartitions() call and in computes hasNext will return records from these partitions), count(), foreach() is working fine it returns the correct number of records. But whenever there is shuffleMap stage (like reduceByKey etc.) then all the tasks are executing properly but it enters in an infinite loop saying : 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3 Here's the complete stack-trace http://pastebin.com/hyK7cG8S What could be the root cause of this problem? I looked up and bumped into this closed JIRA (which is very very old) Thanks Best Regards
Re: Introduce a sbt plugin to deploy and submit jobs to a spark cluster on ec2
Sorry for previous line-breaking format, try to resend the mail again. I have written a sbt plugin called spark-deployer, which is able to deploy a standalone spark cluster on aws ec2 and submit jobs to it. https://github.com/pishen/spark-deployer Compared to current spark-ec2 script, this design may have several benefits (features): 1. All the code are written in Scala. 2. Just add one line in your project/plugins.sbt and you are ready to go. (You don't have to download the python code and store it at someplace.) 3. The whole development flow (write code for spark job, compile the code, launch the cluster, assembly and submit the job to master, terminate the cluster when the job is finished) can be done in sbt. 4. Support parallel deployment of the worker machines by Scala's Future. 5. Allow dynamically add or remove worker machines to/from the current cluster. 6. All the configurations are stored in a typesafe config file. You don't need to store it elsewhere and map the settings into spark-ec2's command line arguments. 7. The core library is separated from sbt plugin, hence it's possible to execute the deployment from an environment without sbt (only JVM is required). 8. Support adjustable ec2 root disk size, custom security groups, custom ami (can run on default Amazon ami), custom spark tarball, and VPC. (Well, most of these are also supported in spark-ec2 in slightly different form, just mention it anyway.) Since this project is still in its early stage, it lacks some features of spark-ec2 such as self-installed HDFS (we use s3 directly), stoppable cluster, ganglia, and the copy script. However, it's already usable for our company and we are trying to move our production spark projects from spark-ec2 to spark-deployer. Any suggestion, testing help, or pull request are highly appreciated. On top of that, I would like to contribute this project to Spark, maybe as another choice (suggestion link) alongside spark-ec2 on Spark's official documentation. Of course, before that, I have to make this project stable enough (strange errors just happen on aws api from time to time). I'm wondering if this kind of contribution is possible and is there any rule to follow or anyone to contact? (Maybe the source code will not be merged into spark's main repository, since I've noticed that spark-ec2 is also planning to move out.) Regards, Pishen Tsai
Re: avoid creating small objects
You can use mapPartitions to do that. On Friday, August 14, 2015, 周千昊 qhz...@apache.org wrote: I am thinking that creating a shared object outside the closure, use this object to hold the byte array. will this work? 周千昊 qhz...@apache.org javascript:_e(%7B%7D,'cvml','qhz...@apache.org');于2015年8月14日周五 下午4:02写道: Hi, All I want to do is that, 1. read from some source 2. do some calculation to get some byte array 3. write the byte array to hdfs In hadoop, I can share an ImmutableByteWritable, and do some System.arrayCopy, it will prevent the application from creating a lot of small objects which will improve the gc latency. *However I was wondering if there is any solution like above in spark that can avoid creating small objects*
Re: Automatically deleting pull request comments left by AmplabJenkins
On Fri, Aug 14, 2015 at 4:21 AM, Josh Rosen rosenvi...@gmail.com wrote: Prototype is at https://github.com/databricks/spark-pr-dashboard/pull/59 On Wed, Aug 12, 2015 at 7:51 PM, Josh Rosen rosenvi...@gmail.com wrote: *TL;DR*: would anyone object if I wrote a script to auto-delete pull request comments from AmplabJenkins? Currently there are two bots which post Jenkins test result comments to GitHub, AmplabJenkins and SparkQA. SparkQA is the account which post the detailed Jenkins start and finish messages that contain information on which commit is being tested and which tests have failed. This bot is controlled via the dev/run-tests-jenkins script. AmplabJenkins is controlled by the Jenkins GitHub Pull Request Builder plugin. This bot posts relatively uninformative comments (Merge build triggered, Merge build started, Merge build failed) that do not contain any links or details specific to the tests being run. Some of these can be configured. For instance, make sure to disable Use comments to report intermediate phases: triggered et al, and if you add a publicly accessible URL in Published Jenkins URL, you will get a link to the test result in the test result comment. I know these are global settings, but the Jenkins URL is unique anyway, and intermediate phases are probably equally annoying to everyone. You can see the only comment posted for a successful PR build here: https://github.com/scala-ide/scala-ide/pull/991#issuecomment-128016214 I'd avoid more custom code if possible. my 2c, iulian It is technically non-trivial prevent these AmplabJenkins comments from being posted in the first place (see https://issues.apache.org/jira/browse/SPARK-4216). However, as a short-term hack I'd like to deploy a script which automatically deletes these comments as soon as they're posted, with an exemption carved out for the Can an admin approve this patch for testing? messages. This will help to significantly de-clutter pull request discussions in the GitHub UI. If nobody objects, I'd like to deploy this script sometime in the next few days. (From a technical perspective, my script uses the GitHub REST API and AmplabJenkins' own OAuth token to delete the comments. The final deployment environment will most likely be the backend of http://spark-prs.appspot.com). - Josh -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: Writing to multiple outputs in Spark
A workaround would be to have multiple passes on the RDD and each pass write its own output? Or in a foreachPartition do it in a single pass (open up multiple files per partition to write out)? -Abhishek- On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: tl;dr hadoop and cascading provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should. I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Re: Writing to multiple outputs in Spark
This is already supported with the new partitioned data sources in DataFrame/SQL right? On Fri, Aug 14, 2015 at 8:04 AM, Alex Angelini alex.angel...@shopify.com wrote: Speaking about Shopify's deployment, this would be a really nice to have feature. We would like to write data to folders with the structure `year/month/day` but have had to hold off on that because of the lack of support for MultipleOutputs. On Fri, Aug 14, 2015 at 10:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should.* I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Setting up Spark/flume/? to Ingest 10TB from FTP
What is the best way to bring such a huge file from a FTP server into Hadoop to persist in HDFS? Since a single jvm process might run out of memory, I was wondering if I can use Spark or Flume to do this. Any help on this matter is appreciated. I prefer a application/process running inside Hadoop which is doing this transfer Thanks.
Re: Writing to multiple outputs in Spark
See: https://issues.apache.org/jira/browse/SPARK-3533 Feel free to comment there and make a case if you think the issue should be reopened. Nick On Fri, Aug 14, 2015 at 11:11 AM Abhishek R. Singh abhis...@tetrationanalytics.com wrote: A workaround would be to have multiple passes on the RDD and each pass write its own output? Or in a foreachPartition do it in a single pass (open up multiple files per partition to write out)? -Abhishek- On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should.* I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Re: Setting up Spark/flume/? to Ingest 10TB from FTP
Why do you need to use Spark or Flume for this? You can just use curl and hdfs: curl ftp://blah | hdfs dfs -put - /blah On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar varad...@yahoo.com.invalid wrote: What is the best way to bring such a huge file from a FTP server into Hadoop to persist in HDFS? Since a single jvm process might run out of memory, I was wondering if I can use Spark or Flume to do this. Any help on this matter is appreciated. I prefer a application/process running inside Hadoop which is doing this transfer Thanks. -- Marcelo
Re: SparkR DataFrame fail to return data of Decimal type
Thanks for the catch. Could you send a PR with this diff ? On Fri, Aug 14, 2015 at 10:30 AM, Shkurenko, Alex ashkure...@enova.com wrote: Got an issue similar to https://issues.apache.org/jira/browse/SPARK-8897, but with the Decimal datatype coming from a Postgres DB: //Set up SparkR Sys.setenv(SPARK_HOME=/Users/ashkurenko/work/git_repos/spark) Sys.setenv(SPARKR_SUBMIT_ARGS=--driver-class-path ~/Downloads/postgresql-9.4-1201.jdbc4.jar sparkr-shell) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local) // Connect to a Postgres DB via JDBC sqlContext - sparkRSQL.init(sc) sql(sqlContext, CREATE TEMPORARY TABLE mytable USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:postgresql://servername:5432/dbname' ,dbtable 'mydbtable' ) ) // Try pulling a Decimal column from a table myDataFrame - sql(sqlContext,(select a_decimal_column from mytable )) // The schema shows up fine show(myDataFrame) DataFrame[a_decimal_column:decimal(10,0)] schema(myDataFrame) StructType |-name = a_decimal_column, type = DecimalType(10,0), nullable = TRUE // ... but pulling data fails: localDF - collect(myDataFrame) Error in as.data.frame.default(x[[i]], optional = TRUE) : cannot coerce class jobj to a data.frame --- Proposed fix: diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index d5b4260..b77ae2a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -219,6 +219,9 @@ private[spark] object SerDe { case float | java.lang.Float = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Float].toDouble) +case decimal | java.math.BigDecimal = + writeType(dos, double) + writeDouble(dos, scala.math.BigDecimal(value.asInstanceOf[java.math.BigDecimal]).toDouble) case double | java.lang.Double = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Double]) Thanks, Alex - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Reliance on java.math.BigInteger implementation
ref: https://issues.apache.org/jira/browse/SPARK-9370 The code to handle BigInteger types in org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters.java and org.apache.spark.unsafe.Platform.java is dependant on the implementation of java.math.BigInteger eg: try { signumOffset = _UNSAFE.objectFieldOffset(BigInteger.class.getDeclaredField(signum)); magOffset = _UNSAFE.objectFieldOffset(BigInteger.class.getDeclaredField(mag)); } catch (Exception ex) { // should not happen } This is relying on there being fields int signum and int[] mag These implementaton fields are not part of the Java specification for this class so can not be relied upon. We are running Spark on IBM jdks and their spec-compliant implementation has different internal fields. This causes an abort when running on these java runtimes. There is also no guarantee that any future implentations of OpenJDK will maintain these field names. I think we need to find an implementation of these Spark functions that only relies on Java compliant classes rather than specific implementations. Any thoughts? Cheers,
SparkR DataFrame fail to return data of Decimal type
Got an issue similar to https://issues.apache.org/jira/browse/SPARK-8897, but with the Decimal datatype coming from a Postgres DB: //Set up SparkR Sys.setenv(SPARK_HOME=/Users/ashkurenko/work/git_repos/spark) Sys.setenv(SPARKR_SUBMIT_ARGS=--driver-class-path ~/Downloads/postgresql-9.4-1201.jdbc4.jar sparkr-shell) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local) // Connect to a Postgres DB via JDBC sqlContext - sparkRSQL.init(sc) sql(sqlContext, CREATE TEMPORARY TABLE mytable USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:postgresql://servername:5432/dbname' ,dbtable 'mydbtable' ) ) // Try pulling a Decimal column from a table myDataFrame - sql(sqlContext,(select a_decimal_column from mytable )) // The schema shows up fine show(myDataFrame) DataFrame[a_decimal_column:decimal(10,0)] schema(myDataFrame) StructType |-name = a_decimal_column, type = DecimalType(10,0), nullable = TRUE // ... but pulling data fails: localDF - collect(myDataFrame) Error in as.data.frame.default(x[[i]], optional = TRUE) : cannot coerce class jobj to a data.frame --- Proposed fix: diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index d5b4260..b77ae2a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -219,6 +219,9 @@ private[spark] object SerDe { case float | java.lang.Float = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Float].toDouble) +case decimal | java.math.BigDecimal = + writeType(dos, double) + writeDouble(dos, scala.math.BigDecimal(value.asInstanceOf[java.math.BigDecimal]).toDouble) case double | java.lang.Double = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Double]) Thanks, Alex
Re: SparkR DataFrame fail to return data of Decimal type
Created https://issues.apache.org/jira/browse/SPARK-9982, working on the PR On Fri, Aug 14, 2015 at 12:43 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: Thanks for the catch. Could you send a PR with this diff ? On Fri, Aug 14, 2015 at 10:30 AM, Shkurenko, Alex ashkure...@enova.com wrote: Got an issue similar to https://issues.apache.org/jira/browse/SPARK-8897 , but with the Decimal datatype coming from a Postgres DB: //Set up SparkR Sys.setenv(SPARK_HOME=/Users/ashkurenko/work/git_repos/spark) Sys.setenv(SPARKR_SUBMIT_ARGS=--driver-class-path ~/Downloads/postgresql-9.4-1201.jdbc4.jar sparkr-shell) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local) // Connect to a Postgres DB via JDBC sqlContext - sparkRSQL.init(sc) sql(sqlContext, CREATE TEMPORARY TABLE mytable USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:postgresql://servername:5432/dbname' ,dbtable 'mydbtable' ) ) // Try pulling a Decimal column from a table myDataFrame - sql(sqlContext,(select a_decimal_column from mytable )) // The schema shows up fine show(myDataFrame) DataFrame[a_decimal_column:decimal(10,0)] schema(myDataFrame) StructType |-name = a_decimal_column, type = DecimalType(10,0), nullable = TRUE // ... but pulling data fails: localDF - collect(myDataFrame) Error in as.data.frame.default(x[[i]], optional = TRUE) : cannot coerce class jobj to a data.frame --- Proposed fix: diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index d5b4260..b77ae2a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -219,6 +219,9 @@ private[spark] object SerDe { case float | java.lang.Float = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Float].toDouble) +case decimal | java.math.BigDecimal = + writeType(dos, double) + writeDouble(dos, scala.math.BigDecimal(value.asInstanceOf[java.math.BigDecimal]).toDouble) case double | java.lang.Double = writeType(dos, double) writeDouble(dos, value.asInstanceOf[Double]) Thanks, Alex
Re: Automatically deleting pull request comments left by AmplabJenkins
I think that I'm still going to want some custom code to remove the build start messages from SparkQA and it's hardly any code, so I'm going to stick with the custom approach for now. The problem is that I don't want _any_ posts from AmplabJenkins, even if they're improved to be more informative, since our custom SparkQA provides nicer output. On Fri, Aug 14, 2015 at 1:57 AM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Fri, Aug 14, 2015 at 4:21 AM, Josh Rosen rosenvi...@gmail.com wrote: Prototype is at https://github.com/databricks/spark-pr-dashboard/pull/59 On Wed, Aug 12, 2015 at 7:51 PM, Josh Rosen rosenvi...@gmail.com wrote: *TL;DR*: would anyone object if I wrote a script to auto-delete pull request comments from AmplabJenkins? Currently there are two bots which post Jenkins test result comments to GitHub, AmplabJenkins and SparkQA. SparkQA is the account which post the detailed Jenkins start and finish messages that contain information on which commit is being tested and which tests have failed. This bot is controlled via the dev/run-tests-jenkins script. AmplabJenkins is controlled by the Jenkins GitHub Pull Request Builder plugin. This bot posts relatively uninformative comments (Merge build triggered, Merge build started, Merge build failed) that do not contain any links or details specific to the tests being run. Some of these can be configured. For instance, make sure to disable Use comments to report intermediate phases: triggered et al, and if you add a publicly accessible URL in Published Jenkins URL, you will get a link to the test result in the test result comment. I know these are global settings, but the Jenkins URL is unique anyway, and intermediate phases are probably equally annoying to everyone. You can see the only comment posted for a successful PR build here: https://github.com/scala-ide/scala-ide/pull/991#issuecomment-128016214 I'd avoid more custom code if possible. my 2c, iulian It is technically non-trivial prevent these AmplabJenkins comments from being posted in the first place (see https://issues.apache.org/jira/browse/SPARK-4216). However, as a short-term hack I'd like to deploy a script which automatically deletes these comments as soon as they're posted, with an exemption carved out for the Can an admin approve this patch for testing? messages. This will help to significantly de-clutter pull request discussions in the GitHub UI. If nobody objects, I'd like to deploy this script sometime in the next few days. (From a technical perspective, my script uses the GitHub REST API and AmplabJenkins' own OAuth token to delete the comments. The final deployment environment will most likely be the backend of http://spark-prs.appspot.com). - Josh -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
SPARK-10000 + now
Five month ago we reached 1 commits on GitHub. Today we reached 1 JIRA tickets. https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20created%3E%3D-1w%20ORDER%20BY%20created%20DESC Hopefully the extra character we have to type doesn't bring our productivity much.
Jenkins having issues?
Hi devs, Jenkins failed twice in my PR https://github.com/apache/spark/pull/8216 for unknown error- https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/40930/console https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/40931/console Can you help? Thank you! Cheolsoo
Re: Setting up Spark/flume/? to Ingest 10TB from FTP
Well what do you do in case of failure? I think one should use a professional ingestion tool that ideally does not need to reload everything in case of failure and verifies that the file has been transferred correctly via checksums. I am not sure if Flume supports ftp, but Ssh,scp should be supported. You may check also other Flume sources or write your own in case of ftp (taking into account comments above). I hope your file is compressed Le ven. 14 août 2015 à 22:23, Marcelo Vanzin van...@cloudera.com a écrit : Why do you need to use Spark or Flume for this? You can just use curl and hdfs: curl ftp://blah | hdfs dfs -put - /blah On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar varad...@yahoo.com.invalid wrote: What is the best way to bring such a huge file from a FTP server into Hadoop to persist in HDFS? Since a single jvm process might run out of memory, I was wondering if I can use Spark or Flume to do this. Any help on this matter is appreciated. I prefer a application/process running inside Hadoop which is doing this transfer Thanks. -- Marcelo
Re: Fwd: [ANNOUNCE] Spark 1.5.0-preview package
Is it possible that you have only upgraded some set of nodes but not the others? We have ran some performance benchmarks on this so it definitely runs in some configuration. Could still be buggy in some other configurations though. On Fri, Aug 14, 2015 at 6:37 AM, mkhaitman mark.khait...@chango.com wrote: Has anyone had success using this preview? We were able to build the preview, and able to start the spark-master, however, unable to connect any spark workers to it. Kept receiving AkkaRpcEnv address in use while attempting to connect the spark-worker to the master. Also confirmed that the worker was indeed starting and a non-blocking random port as expected, and not dying... just refusing to connect. :/ Built using maven 3.3.3 with jdk 7, on centos 7. We're running a hadoop 2.7.1 cluster in development (where I attempted to run this), and targeting hadoop 2.6 worked fine for 1.4.1 at least, so wondering if that is the cause of anything. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Fwd-ANNOUNCE-Spark-1-5-0-preview-package-tp13683p13710.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Automatically deleting pull request comments left by AmplabJenkins
The updated prototype listed in https://github.com/databricks/spark-pr-dashboard/pull/59 is now running live on spark-prs as part of its PR comment update task. On Fri, Aug 14, 2015 at 10:51 AM, Josh Rosen rosenvi...@gmail.com wrote: I think that I'm still going to want some custom code to remove the build start messages from SparkQA and it's hardly any code, so I'm going to stick with the custom approach for now. The problem is that I don't want _any_ posts from AmplabJenkins, even if they're improved to be more informative, since our custom SparkQA provides nicer output. On Fri, Aug 14, 2015 at 1:57 AM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Fri, Aug 14, 2015 at 4:21 AM, Josh Rosen rosenvi...@gmail.com wrote: Prototype is at https://github.com/databricks/spark-pr-dashboard/pull/59 On Wed, Aug 12, 2015 at 7:51 PM, Josh Rosen rosenvi...@gmail.com wrote: *TL;DR*: would anyone object if I wrote a script to auto-delete pull request comments from AmplabJenkins? Currently there are two bots which post Jenkins test result comments to GitHub, AmplabJenkins and SparkQA. SparkQA is the account which post the detailed Jenkins start and finish messages that contain information on which commit is being tested and which tests have failed. This bot is controlled via the dev/run-tests-jenkins script. AmplabJenkins is controlled by the Jenkins GitHub Pull Request Builder plugin. This bot posts relatively uninformative comments (Merge build triggered, Merge build started, Merge build failed) that do not contain any links or details specific to the tests being run. Some of these can be configured. For instance, make sure to disable Use comments to report intermediate phases: triggered et al, and if you add a publicly accessible URL in Published Jenkins URL, you will get a link to the test result in the test result comment. I know these are global settings, but the Jenkins URL is unique anyway, and intermediate phases are probably equally annoying to everyone. You can see the only comment posted for a successful PR build here: https://github.com/scala-ide/scala-ide/pull/991#issuecomment-128016214 I'd avoid more custom code if possible. my 2c, iulian It is technically non-trivial prevent these AmplabJenkins comments from being posted in the first place (see https://issues.apache.org/jira/browse/SPARK-4216). However, as a short-term hack I'd like to deploy a script which automatically deletes these comments as soon as they're posted, with an exemption carved out for the Can an admin approve this patch for testing? messages. This will help to significantly de-clutter pull request discussions in the GitHub UI. If nobody objects, I'd like to deploy this script sometime in the next few days. (From a technical perspective, my script uses the GitHub REST API and AmplabJenkins' own OAuth token to delete the comments. The final deployment environment will most likely be the backend of http://spark-prs.appspot.com). - Josh -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com