I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.
First of all, I am able to get the List of file keys that have a modified date within a range of time by using the AWS SDK Objects (AmazonS3Client, ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by setting up the HadoopConfiguration object with s3 access and secret keys, I parallelize, partition, and iterate through the List to load each file’s contents into a RDD[(String, org.apache.spark.input.PortableDataStream)]. val hadoopConf = sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", accessKey) hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey) val filesRdd = sc.parallelize(lFiles) filesRdd.foreachPartition(files => { val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x)) val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all zip input streams val lStrContent = lZipStream.map(x => readZipStream(x)) // read contents into string }) This is where I need help. I get this error. <console>:196: error: type mismatch; found : org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] required: java.io.InputStream val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all zip input streams ^ Does anyone know how to load the PortableDataStream returned in a RDD and convert it into a ZipInputStream? Thanks, Ben --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org