Re: using MultipleOutputFormat to ensure one output file per key

2014-11-25 Thread Rafal Kwasny
Hi,

Arpan Ghosh wrote:
 Hi,

 How can I implement a custom MultipleOutputFormat and specify it as
 the output of my Spark job so that I can ensure that there is a unique
 output file per key (instead of a a unique output file per reducer)?


I use something like this:

class KeyBasedOutput[T : Null ,V : AnyRef] extends
MultipleTextOutputFormat[T , V] {
  override protected def generateFileNameForKeyValue(key: T, value: V,
leaf: String) = {
key.toString()+/+leaf
  }
  override protected def generateActualKey(key: T, value: V) = {
null
  }
  // this could be dangerous and overwrite files
  @throws(classOf[FileAlreadyExistsException])
  @throws(classOf[InvalidJobConfException])
  @throws(classOf[IOException])
  override def checkOutputSpecs(ignored: FileSystem,job: JobConf) ={
  }
}

and then just set a jobconf:

  val jobConf = new JobConf(self.context.hadoopConfiguration)
  jobConf.setOutputKeyClass(classOf[String])
  jobConf.setOutputValueClass(classOf[String])
  jobConf.setOutputFormat(classOf[KeyBasedOutput[String, String]])
  rdd.saveAsHadoopDataset(jobConf)


/Rafal

 Thanks

 Arpan


-- 
Regards
Rafał Kwasny
mailto:/jabberid: m...@entropy.be

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark output to s3 extremely slow

2014-10-15 Thread Rafal Kwasny
Hi,
How large is the dataset you're saving into S3?

Actually saving to S3 is done in two steps:
1) writing temporary files
2) commiting them to proper directory
Step 2) could be slow because S3 do not have a quick atomic move
operation, you have to copy (server side but still takes time) and then
delete the original.

I've overcome this but using a jobconf with NullOutputCommitter
  jobConf.setOutputCommitter(classOf[NullOutputCommitter])

Where NullOutputCommiter is a Class that doesn't do anything:

  class NullOutputCommitter extends OutputCommitter {
def abortTask(taskContext: TaskAttemptContext) =  { }
override  def cleanupJob(jobContext: JobContext ) = { }
def commitTask(taskContext: TaskAttemptContext ) = { }
def needsTaskCommit(taskContext: TaskAttemptContext ) = {  false  }
def setupJob(jobContext: JobContext) { }
def setupTask(taskContext: TaskAttemptContext) { }
  }

This works but maybe someone has a better solution.

/Raf

anny9699 wrote:
 Hi,

 I found writing output back to s3 using rdd.saveAsTextFile() is extremely
 slow, much slower than reading from s3. Is there a way to make it faster?
 The rdd has 150 partitions so parallelism is enough I assume.

 Thanks a lot!
 Anny



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-output-to-s3-extremely-slow-tp16447.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3 Bucket Access

2014-10-14 Thread Rafal Kwasny
Hi,
keep in mind that you're going to have a bad time if your secret key
contains a /
This is due to old and stupid hadoop bug:
https://issues.apache.org/jira/browse/HADOOP-3733

Best way is to regenerate the key so it does not include a /

/Raf

Akhil Das wrote:
 Try the following:

 1. Set the access key and secret key in the sparkContext:

 sparkContext.set(
 ​
 AWS_ACCESS_KEY_ID,yourAccessKey)

 sparkContext.set(
 ​
 AWS_SECRET_ACCESS_KEY,yourSecretKey)


 2. Set the access key and secret key in the environment before
 starting your application:

 ​

 export
 ​​
 AWS_ACCESS_KEY_ID=your access

 export
 ​​
 AWS_SECRET_ACCESS_KEY=your secret​


 3. Set the access key and secret key inside the hadoop configurations

 val hadoopConf=sparkContext.hadoopConfiguration;

 hadoopConf.set(fs.s3.impl,
 org.apache.hadoop.fs.s3native.NativeS3FileSystem)

 hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey)

 hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey)


 4. You can also try:

 val lines = 

 ​s
 
 parkContext.textFile(s3n://yourAccessKey:yourSecretKey@yourBucket/path/) 


 Thanks
 Best Regards

 On Mon, Oct 13, 2014 at 11:33 PM, Ranga sra...@gmail.com
 mailto:sra...@gmail.com wrote:

 Hi

 I am trying to access files/buckets in S3 and encountering a
 permissions issue. The buckets are configured to authenticate
 using an IAMRole provider.
 I have set the KeyId and Secret using environment variables
 (AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID). However, I am still
 unable to access the S3 buckets. 

 Before setting the access key and secret the error was:
 java.lang.IllegalArgumentException: AWS Access Key ID and Secret
 Access Key must be specified as the username or password
 (respectively) of a s3n URL, or by setting the
 fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties
 (respectively).

 After setting the access key and secret, the error is: The AWS
 Access Key Id you provided does not exist in our records.

 The id/secret being set are the right values. This makes me
 believe that something else (token, etc.) needs to be set as well. 
 Any help is appreciated. 


 - Ranga





Re: Having spark-ec2 join new slaves to existing cluster

2014-04-06 Thread Rafal Kwasny
Hi,
This will work nicely unless you're using spot instances, in this case
the start does not work as slaves are lost on shutdown.
I feel like spark-ec2 script need a major refactor to cope with new
features/more users using it in dynamic environments.
Are there any current plans to migrate it to CDH5 (just released) based
install?

/Raf

Nicholas Chammas wrote:
 Sweet, thanks for the instructions. This will do for resizing a dev
 cluster that you can bring down at will.

 I will open a JIRA issue about adding the functionality I described to
 spark-ec2.


 On Fri, Apr 4, 2014 at 3:43 PM, Matei Zaharia matei.zaha...@gmail.com
 mailto:matei.zaha...@gmail.com wrote:

 This can't be done through the script right now, but you can do it
 manually as long as the cluster is stopped. If the cluster is
 stopped, just go into the AWS Console, right click a slave and
 choose launch more of these to add more. Or select multiple
 slaves and delete them. When you run spark-ec2 start the next time
 to start your cluster, it will set it up on all the machines it
 finds in the mycluster-slaves security group.

 This is pretty hacky so it would definitely be good to add this
 feature; feel free to open a JIRA about it.

 Matei

 On Apr 4, 2014, at 12:16 PM, Nicholas Chammas
 nicholas.cham...@gmail.com mailto:nicholas.cham...@gmail.com
 wrote:

 I would like to be able to use spark-ec2 to launch new slaves and
 add them to an existing, running cluster. Similarly, I would also
 like to remove slaves from an existing cluster.

 Use cases include:

  1. Oh snap, I sized my cluster incorrectly. Let me add/remove
 some slaves.
  2. During scheduled batch processing, I want to add some new
 slaves, perhaps on spot instances. When that processing is
 done, I want to kill them. (Cruel, I know.)

 I gather this is not possible at the moment. spark-ec2 appears to
 be able to launch new slaves for an existing cluster only if the
 master is stopped. I also do not see any ability to remove slaves
 from a cluster.

 Is that correct? Are there plans to add such functionality to
 spark-ec2 in the future?

 Nick


 
 View this message in context: Having spark-ec2 join new slaves to
 existing cluster
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Having-spark-ec2-join-new-slaves-to-existing-cluster-tp3783.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at
 Nabble.com http://Nabble.com.