GitHub user Susmit07 deleted a comment on the discussion: S3 multipart upload 
for parquet

Sharing the code for reference

```
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model.ContentTypes
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.connectors.hdfs.scaladsl.HdfsSource
import org.apache.pekko.stream.connectors.s3.scaladsl.S3
import org.apache.pekko.stream.connectors.s3.{MultipartUploadResult, S3Headers}
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}

object HDFSFileToS3Uploader {

  // 5 MB threshold for switching between putObject and multipart, ideally its 
above 100mb.
  private val fileSizeThreshold: Long = 50 * 1024 * 1024

  private class HDFSStreamToS3(
                                hdfsFilePath: String, // HDFS file path
                                bucket: String, // S3 bucket name
                                bucketKey: String // S3 object key (file path 
in S3)
                              )(implicit system: ActorSystem, mat: 
Materializer) {

    // Function to stream HDFS file as a ByteString source
    private def streamHDFSFile: Source[ByteString, _] = {
      val hadoopConfig = new Configuration()
      val hdfsPath = new Path(hdfsFilePath)

      // Get the HDFS FileSystem and open the file stream
      val fs = FileSystem.get(hadoopConfig)
      HdfsSource.data(fs, hdfsPath)
    }

    // Function to get the file size from HDFS
    private def getHDFSFileSize: Long = {
      val hadoopConfig = new Configuration()
      val hdfsPath = new Path(hdfsFilePath)
      val fs = FileSystem.get(hadoopConfig)
      fs.getFileStatus(hdfsPath).getLen
    }

    // Function to upload small files using putObject with content length
    private def uploadSmallFileToS3(contentLength: Long): Future[_] = {
      val hdfsSource: Source[ByteString, _] = streamHDFSFile

      // Define the S3 putObject with content length
      val s3Headers = S3Headers.empty
      val s3PutObject = S3.putObject(
        bucket,
        bucketKey,
        hdfsSource,
        contentLength,
        ContentTypes.`application/octet-stream`,
        s3Headers
      )

      // Run the stream to upload the file using putObject
      s3PutObject.runWith(Sink.head).flatMap(objectMetadata => {
        println(s"Uploaded successfully with ETag: ${objectMetadata.eTag}")
        Future.successful(Done)
      })
    }

    // Function to upload large files using multipart upload
    private def uploadLargeFileToS3(): Future[MultipartUploadResult] = {
      val hdfsSource: Source[ByteString, _] = streamHDFSFile

      // Define the S3 multipart upload sink
      val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
        S3.multipartUpload(bucket, bucketKey)

      // Upload the file using multipart upload
      hdfsSource.runWith(s3Sink)
    }

    // Function to determine whether to use putObject or multipartUpload based 
on file size
    def uploadToS3(): Future[_] = {
      val fileSize = getHDFSFileSize
      println(s"Starting => ${Instant.now()}")
      if (fileSize <= fileSizeThreshold) {
        println(s"File size $fileSize is small, using putObject.")
        uploadSmallFileToS3(fileSize)
      } else {
        println(s"File size $fileSize is large, using multipart upload.")
        uploadLargeFileToS3()
      }
    }.andThen {
      case Success(_) =>
        val endTime = Instant.now()
        println(s"Upload ended at: $endTime")
      case Failure(exception) =>
        val endTime = Instant.now()
        println(s"Upload failed at: $endTime, Error: ${exception.getMessage}")
    }
  }

  def main(args: Array[String]): Unit = {
    // Initialize the Actor System and Materializer needed by Pekko Streams
    implicit val system: ActorSystem = ActorSystem("HDFSFileToS3System")
    implicit val mat: Materializer = Materializer(system)

    // Define the HDFS file path, S3 bucket, and S3 object key
    val hdfsFilePath = "/Users/susmit.sarkar/Downloads/test.snappy.parquet"
    val bucket = "bg0975-cef-ccmedev-data"
    val bucketKey = "test.snappy.parquet"

    // Create an instance of the HDFSStreamToS3 class and start the upload 
process
    val hdfsToS3Uploader = new HDFSStreamToS3(hdfsFilePath, bucket, bucketKey)
    val result: Future[_] = hdfsToS3Uploader.uploadToS3()

    // Handle the upload result (success or failure)
    result.onComplete {
      case Success(_) =>
        println(s"File uploaded successfully to S3.")
        system.terminate()
      case Failure(exception) =>
        exception.printStackTrace()
        println(s"File upload failed: ${exception.getMessage}")
        system.terminate()
    }
  }
}
```
This below snippet part can be added in doc, multipart upload is already there .

```
// Function to upload small files using putObject with content length
    private def uploadSmallFileToS3(contentLength: Long): Future[_] = {
      val hdfsSource: Source[ByteString, _] = streamHDFSFile

      // Define the S3 putObject with content length
      val s3Headers = S3Headers.empty
      val s3PutObject = S3.putObject(
        bucket,
        bucketKey,
        hdfsSource,
        contentLength,
        ContentTypes.`application/octet-stream`,
        s3Headers
      )

      // Run the stream to upload the file using putObject
      s3PutObject.runWith(Sink.head).flatMap(objectMetadata => {
        println(s"Uploaded successfully with ETag: ${objectMetadata.eTag}")
        Future.successful(Done)
      })
    }
```

Several advantages of using S3.multipartUpload & S3.putObject

1. Both the APIs support all or nothing, no dangling file visible if Process 
restarts 
2. For large files we can use Multipart Upload, and can resume upload if we can 
keep track of the last upload eTag
3. Support for any file type.
4. AvroParquetWriter can also work but its confined to Parquet files only

```
val file = "./sample/path/test.parquet"
val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val writer =
  AvroParquetWriter.builder[Record](new 
Path(file)).withConf(conf).withSchema(schema).build()
```
We need to use hadoop-aws an extra library to upload to S3

```
// Creates an AvroParquetWriter to write data to S3
private def createParquetWriter(
                                   outputFile: HadoopOutputFile,
                                   parquetSource: Source[GenericRecord, 
NotUsed],
                                   configuration: Configuration
                                 )(implicit system: ActorSystem[_], ec: 
ExecutionContext): Future[ParquetWriter[GenericRecord]] = {
    // Extract schema from the first record in the stream and create 
ParquetWriter
    val schemaFuture: Future[Schema] = 
parquetSource.runWith(Sink.headOption).flatMap {
      case Some(firstRecord) =>
        Future.successful(firstRecord.getSchema)
      case None =>
        logger.warn("Source stream is empty, falling back to default schema.")
        Future.failed(new IllegalArgumentException("No records available in the 
source to extract schema."))
    }

    // Once the schema is extracted, create the ParquetWriter
    schemaFuture.map { schema =>
      configuration.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
      AvroParquetWriter.builder[GenericRecord](outputFile)
        .withConf(configuration)
        .withSchema(schema)
        .withWriteMode(Mode.OVERWRITE)
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withDictionaryEncoding(true)
        .build()
    }
  }

// Another way of writing to S3.
createParquetWriter(tempOutputFile, parquetSource, config).flatMap { writer 
=>.flatMap { writer =>
    val processedSource = parquetSource.via(AvroParquetFlow(writer))
    processedSource.runWith(Sink.ignore)
}
```

GitHub link: 
https://github.com/apache/pekko-connectors/discussions/870#discussioncomment-10996635

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to