Re: using MultipleOutputFormat to ensure one output file per key
Hi, Arpan Ghosh wrote: Hi, How can I implement a custom MultipleOutputFormat and specify it as the output of my Spark job so that I can ensure that there is a unique output file per key (instead of a a unique output file per reducer)? I use something like this: class KeyBasedOutput[T : Null ,V : AnyRef] extends MultipleTextOutputFormat[T , V] { override protected def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString()+/+leaf } override protected def generateActualKey(key: T, value: V) = { null } // this could be dangerous and overwrite files @throws(classOf[FileAlreadyExistsException]) @throws(classOf[InvalidJobConfException]) @throws(classOf[IOException]) override def checkOutputSpecs(ignored: FileSystem,job: JobConf) ={ } } and then just set a jobconf: val jobConf = new JobConf(self.context.hadoopConfiguration) jobConf.setOutputKeyClass(classOf[String]) jobConf.setOutputValueClass(classOf[String]) jobConf.setOutputFormat(classOf[KeyBasedOutput[String, String]]) rdd.saveAsHadoopDataset(jobConf) /Rafal Thanks Arpan -- Regards Rafał Kwasny mailto:/jabberid: m...@entropy.be - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark output to s3 extremely slow
Hi, How large is the dataset you're saving into S3? Actually saving to S3 is done in two steps: 1) writing temporary files 2) commiting them to proper directory Step 2) could be slow because S3 do not have a quick atomic move operation, you have to copy (server side but still takes time) and then delete the original. I've overcome this but using a jobconf with NullOutputCommitter jobConf.setOutputCommitter(classOf[NullOutputCommitter]) Where NullOutputCommiter is a Class that doesn't do anything: class NullOutputCommitter extends OutputCommitter { def abortTask(taskContext: TaskAttemptContext) = { } override def cleanupJob(jobContext: JobContext ) = { } def commitTask(taskContext: TaskAttemptContext ) = { } def needsTaskCommit(taskContext: TaskAttemptContext ) = { false } def setupJob(jobContext: JobContext) { } def setupTask(taskContext: TaskAttemptContext) { } } This works but maybe someone has a better solution. /Raf anny9699 wrote: Hi, I found writing output back to s3 using rdd.saveAsTextFile() is extremely slow, much slower than reading from s3. Is there a way to make it faster? The rdd has 150 partitions so parallelism is enough I assume. Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-output-to-s3-extremely-slow-tp16447.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: S3 Bucket Access
Hi, keep in mind that you're going to have a bad time if your secret key contains a / This is due to old and stupid hadoop bug: https://issues.apache.org/jira/browse/HADOOP-3733 Best way is to regenerate the key so it does not include a / /Raf Akhil Das wrote: Try the following: 1. Set the access key and secret key in the sparkContext: sparkContext.set( AWS_ACCESS_KEY_ID,yourAccessKey) sparkContext.set( AWS_SECRET_ACCESS_KEY,yourSecretKey) 2. Set the access key and secret key in the environment before starting your application: export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret 3. Set the access key and secret key inside the hadoop configurations val hadoopConf=sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) 4. You can also try: val lines = s parkContext.textFile(s3n://yourAccessKey:yourSecretKey@yourBucket/path/) Thanks Best Regards On Mon, Oct 13, 2014 at 11:33 PM, Ranga sra...@gmail.com mailto:sra...@gmail.com wrote: Hi I am trying to access files/buckets in S3 and encountering a permissions issue. The buckets are configured to authenticate using an IAMRole provider. I have set the KeyId and Secret using environment variables (AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID). However, I am still unable to access the S3 buckets. Before setting the access key and secret the error was: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). After setting the access key and secret, the error is: The AWS Access Key Id you provided does not exist in our records. The id/secret being set are the right values. This makes me believe that something else (token, etc.) needs to be set as well. Any help is appreciated. - Ranga
Re: Having spark-ec2 join new slaves to existing cluster
Hi, This will work nicely unless you're using spot instances, in this case the start does not work as slaves are lost on shutdown. I feel like spark-ec2 script need a major refactor to cope with new features/more users using it in dynamic environments. Are there any current plans to migrate it to CDH5 (just released) based install? /Raf Nicholas Chammas wrote: Sweet, thanks for the instructions. This will do for resizing a dev cluster that you can bring down at will. I will open a JIRA issue about adding the functionality I described to spark-ec2. On Fri, Apr 4, 2014 at 3:43 PM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: This can't be done through the script right now, but you can do it manually as long as the cluster is stopped. If the cluster is stopped, just go into the AWS Console, right click a slave and choose launch more of these to add more. Or select multiple slaves and delete them. When you run spark-ec2 start the next time to start your cluster, it will set it up on all the machines it finds in the mycluster-slaves security group. This is pretty hacky so it would definitely be good to add this feature; feel free to open a JIRA about it. Matei On Apr 4, 2014, at 12:16 PM, Nicholas Chammas nicholas.cham...@gmail.com mailto:nicholas.cham...@gmail.com wrote: I would like to be able to use spark-ec2 to launch new slaves and add them to an existing, running cluster. Similarly, I would also like to remove slaves from an existing cluster. Use cases include: 1. Oh snap, I sized my cluster incorrectly. Let me add/remove some slaves. 2. During scheduled batch processing, I want to add some new slaves, perhaps on spot instances. When that processing is done, I want to kill them. (Cruel, I know.) I gather this is not possible at the moment. spark-ec2 appears to be able to launch new slaves for an existing cluster only if the master is stopped. I also do not see any ability to remove slaves from a cluster. Is that correct? Are there plans to add such functionality to spark-ec2 in the future? Nick View this message in context: Having spark-ec2 join new slaves to existing cluster http://apache-spark-user-list.1001560.n3.nabble.com/Having-spark-ec2-join-new-slaves-to-existing-cluster-tp3783.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com http://Nabble.com.