I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.

First of all, I am able to get the List of file keys that have a modified date 
within a range of time by using the AWS SDK Objects (AmazonS3Client, 
ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by 
setting up the HadoopConfiguration object with s3 access and secret keys, I 
parallelize, partition, and iterate through the List to load each file’s 
contents into a RDD[(String, org.apache.spark.input.PortableDataStream)].

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)

val filesRdd = sc.parallelize(lFiles)
filesRdd.foreachPartition(files => {
  val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x))
  val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all 
zip input streams
  val lStrContent = lZipStream.map(x => readZipStream(x))  // read contents 
into string 

})

This is where I need help. I get this error.

<console>:196: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)]
 required: java.io.InputStream
                val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // 
make them all zip input streams
                                                                                
                           ^

Does anyone know how to load the PortableDataStream returned in a RDD and 
convert it into a ZipInputStream?

Thanks,
Ben
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to