GitHub user Susmit07 deleted a comment on the discussion: S3 multipart upload
for parquet
Sharing the code for reference
```
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.ContentTypes
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.connectors.hdfs.scaladsl.HdfsSource
import org.apache.pekko.stream.connectors.s3.scaladsl.S3
import org.apache.pekko.stream.connectors.s3.{MultipartUploadResult, S3Headers}
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
object HDFSFileToS3Uploader {
// 5 MB threshold for switching between putObject and multipart, ideally its
above 100mb.
private val fileSizeThreshold: Long = 50 * 1024 * 1024
private class HDFSStreamToS3(
hdfsFilePath: String, // HDFS file path
bucket: String, // S3 bucket name
bucketKey: String // S3 object key (file path
in S3)
)(implicit system: ActorSystem, mat:
Materializer) {
// Function to stream HDFS file as a ByteString source
private def streamHDFSFile: Source[ByteString, _] = {
val hadoopConfig = new Configuration()
val hdfsPath = new Path(hdfsFilePath)
// Get the HDFS FileSystem and open the file stream
val fs = FileSystem.get(hadoopConfig)
HdfsSource.data(fs, hdfsPath)
}
// Function to get the file size from HDFS
private def getHDFSFileSize: Long = {
val hadoopConfig = new Configuration()
val hdfsPath = new Path(hdfsFilePath)
val fs = FileSystem.get(hadoopConfig)
fs.getFileStatus(hdfsPath).getLen
}
// Function to upload small files using putObject with content length
private def uploadSmallFileToS3(contentLength: Long): Future[_] = {
val hdfsSource: Source[ByteString, _] = streamHDFSFile
// Define the S3 putObject with content length
val s3Headers = S3Headers.empty
val s3PutObject = S3.putObject(
bucket,
bucketKey,
hdfsSource,
contentLength,
ContentTypes.`application/octet-stream`,
s3Headers
)
// Run the stream to upload the file using putObject
s3PutObject.runWith(Sink.head).flatMap(objectMetadata => {
println(s"Uploaded successfully with ETag: ${objectMetadata.eTag}")
Future.successful(Done)
})
}
// Function to upload large files using multipart upload
private def uploadLargeFileToS3(): Future[MultipartUploadResult] = {
val hdfsSource: Source[ByteString, _] = streamHDFSFile
// Define the S3 multipart upload sink
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(bucket, bucketKey)
// Upload the file using multipart upload
hdfsSource.runWith(s3Sink)
}
// Function to determine whether to use putObject or multipartUpload based
on file size
def uploadToS3(): Future[_] = {
val fileSize = getHDFSFileSize
println(s"Starting => ${Instant.now()}")
if (fileSize <= fileSizeThreshold) {
println(s"File size $fileSize is small, using putObject.")
uploadSmallFileToS3(fileSize)
} else {
println(s"File size $fileSize is large, using multipart upload.")
uploadLargeFileToS3()
}
}.andThen {
case Success(_) =>
val endTime = Instant.now()
println(s"Upload ended at: $endTime")
case Failure(exception) =>
val endTime = Instant.now()
println(s"Upload failed at: $endTime, Error: ${exception.getMessage}")
}
}
def main(args: Array[String]): Unit = {
// Initialize the Actor System and Materializer needed by Pekko Streams
implicit val system: ActorSystem = ActorSystem("HDFSFileToS3System")
implicit val mat: Materializer = Materializer(system)
// Define the HDFS file path, S3 bucket, and S3 object key
val hdfsFilePath = "/Users/susmit.sarkar/Downloads/test.snappy.parquet"
val bucket = "bg0975-cef-ccmedev-data"
val bucketKey = "test.snappy.parquet"
// Create an instance of the HDFSStreamToS3 class and start the upload
process
val hdfsToS3Uploader = new HDFSStreamToS3(hdfsFilePath, bucket, bucketKey)
val result: Future[_] = hdfsToS3Uploader.uploadToS3()
// Handle the upload result (success or failure)
result.onComplete {
case Success(_) =>
println(s"File uploaded successfully to S3.")
system.terminate()
case Failure(exception) =>
exception.printStackTrace()
println(s"File upload failed: ${exception.getMessage}")
system.terminate()
}
}
}
```
This below snippet part can be added in doc, multipart upload is already there .
```
// Function to upload small files using putObject with content length
private def uploadSmallFileToS3(contentLength: Long): Future[_] = {
val hdfsSource: Source[ByteString, _] = streamHDFSFile
// Define the S3 putObject with content length
val s3Headers = S3Headers.empty
val s3PutObject = S3.putObject(
bucket,
bucketKey,
hdfsSource,
contentLength,
ContentTypes.`application/octet-stream`,
s3Headers
)
// Run the stream to upload the file using putObject
s3PutObject.runWith(Sink.head).flatMap(objectMetadata => {
println(s"Uploaded successfully with ETag: ${objectMetadata.eTag}")
Future.successful(Done)
})
}
```
Several advantages of using S3.multipartUpload & S3.putObject
1. Both the APIs support all or nothing, no dangling file visible if Process
restarts
2. For large files we can use Multipart Upload, and can resume upload if we can
keep track of the last upload eTag
3. Support for any file type.
4. AvroParquetWriter can also work but its confined to Parquet files only
```
val file = "./sample/path/test.parquet"
val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val writer =
AvroParquetWriter.builder[Record](new
Path(file)).withConf(conf).withSchema(schema).build()
```
We need to use hadoop-aws an extra library to upload to S3
```
// Creates an AvroParquetWriter to write data to S3
private def createParquetWriter(
outputFile: HadoopOutputFile,
parquetSource: Source[GenericRecord,
NotUsed],
configuration: Configuration
)(implicit system: ActorSystem[_], ec:
ExecutionContext): Future[ParquetWriter[GenericRecord]] = {
// Extract schema from the first record in the stream and create
ParquetWriter
val schemaFuture: Future[Schema] =
parquetSource.runWith(Sink.headOption).flatMap {
case Some(firstRecord) =>
Future.successful(firstRecord.getSchema)
case None =>
logger.warn("Source stream is empty, falling back to default schema.")
Future.failed(new IllegalArgumentException("No records available in the
source to extract schema."))
}
// Once the schema is extracted, create the ParquetWriter
schemaFuture.map { schema =>
configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
AvroParquetWriter.builder[GenericRecord](outputFile)
.withConf(configuration)
.withSchema(schema)
.withWriteMode(Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withDictionaryEncoding(true)
.build()
}
}
// Another way of writing to S3.
createParquetWriter(tempOutputFile, parquetSource, config).flatMap { writer
=>.flatMap { writer =>
val processedSource = parquetSource.via(AvroParquetFlow(writer))
processedSource.runWith(Sink.ignore)
}
```
GitHub link:
https://github.com/apache/pekko-connectors/discussions/870#discussioncomment-10996635
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]