I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.
First of all, I am able to get the List of file keys that have a modified date
within a range of time by using the AWS SDK Objects (AmazonS3Client,
ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by
setting up the HadoopConfiguration object with s3 access and secret keys, I
parallelize, partition, and iterate through the List to load each file’s
contents into a RDD[(String, org.apache.spark.input.PortableDataStream)].
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)
val filesRdd = sc.parallelize(lFiles)
filesRdd.foreachPartition(files => {
val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x))
val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all
zip input streams
val lStrContent = lZipStream.map(x => readZipStream(x)) // read contents
into string
})
This is where I need help. I get this error.
<console>:196: error: type mismatch;
found : org.apache.spark.rdd.RDD[(String,
org.apache.spark.input.PortableDataStream)]
required: java.io.InputStream
val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) //
make them all zip input streams
^
Does anyone know how to load the PortableDataStream returned in a RDD and
convert it into a ZipInputStream?
Thanks,
Ben
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]