mdedetrich commented on code in PR #167:
URL: 
https://github.com/apache/incubator-pekko-connectors/pull/167#discussion_r1249748399


##########
s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala:
##########
@@ -1177,11 +1177,15 @@ import scala.util.{ Failure, Success, Try }
 
         import conf.multipartUploadSettings.retrySettings._
 
-        SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
-          .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // 
creates the chunks
-          .mergeSubstreamsWithParallelism(parallelism)
+        val source1: SubFlow[Chunk, NotUsed, Flow[ByteString, ByteString, 
NotUsed]#Repr, Sink[ByteString, NotUsed]] =
+          SplitAfterSize(chunkSize, chunkBufferSize)(atLeastOneByteString)
+            .via(getChunkBuffer(chunkSize, chunkBufferSize, maxRetries)) // 
creates the chunks
+
+        val source2 = source1.mergeSubstreamsWithParallelism(parallelism)

Review Comment:
   The `source2` here seems to be a leftover from refactoring/figuring out 
types locally? You should be able to just go
   
   ```scala
   source1.mergeSubstreamsWithParallelism(parallelism)
     .filter(_.size > 0)
     .via(atLeastOne)
     .zip(requestInfoOrUploadState(s3Location, contentType, s3Headers, 
initialUploadState))
     // etc etc
   ```



##########
s3/src/main/scala/org/apache/pekko/stream/connectors/s3/javadsl/S3.scala:
##########
@@ -485,8 +486,11 @@ object S3 {
    * @return A [[pekko.stream.javadsl.Source]] containing the objects data as 
a [[pekko.util.ByteString]] along with a materialized value containing the
    *         [[pekko.stream.connectors.s3.ObjectMetadata]]
    */
-  def getObject(bucket: String, key: String): Source[ByteString, 
CompletionStage[ObjectMetadata]] =
-    new Source(S3Stream.getObject(S3Location(bucket, key), None, None, 
S3Headers.empty).toCompletionStage())
+  def getObject(bucket: String, key: String): Source[ByteString, 
CompletionStage[ObjectMetadata]] = {
+    val objectSource: SourceToCompletionStage[ByteString, ObjectMetadata] =

Review Comment:
   I think this type can be simplified but as before will have to check out 
locally (also applies for other future instances).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to