Hello community,

here is the log from the commit of package ghc-amazonka-s3-streaming for 
openSUSE:Factory checked in at 2017-08-31 20:50:02
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming (Old)
 and      /work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "ghc-amazonka-s3-streaming"

Thu Aug 31 20:50:02 2017 rev:3 rq:513200 version:0.2.0.3

Changes:
--------
--- 
/work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming/ghc-amazonka-s3-streaming.changes
      2017-07-28 09:47:55.153927102 +0200
+++ 
/work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new/ghc-amazonka-s3-streaming.changes
 2017-08-31 20:50:04.125305973 +0200
@@ -1,0 +2,5 @@
+Thu Jul 27 14:07:03 UTC 2017 - psim...@suse.com
+
+- Update to version 0.2.0.3.
+
+-------------------------------------------------------------------

Old:
----
  amazonka-s3-streaming-0.1.0.4.tar.gz

New:
----
  amazonka-s3-streaming-0.2.0.3.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ ghc-amazonka-s3-streaming.spec ++++++
--- /var/tmp/diff_new_pack.oYwQwi/_old  2017-08-31 20:50:05.081171798 +0200
+++ /var/tmp/diff_new_pack.oYwQwi/_new  2017-08-31 20:50:05.093170114 +0200
@@ -18,7 +18,7 @@
 
 %global pkg_name amazonka-s3-streaming
 Name:           ghc-%{pkg_name}
-Version:        0.1.0.4
+Version:        0.2.0.3
 Release:        0
 Summary:        Provides conduits to upload data to S3 using the Multipart API
 License:        MIT
@@ -33,8 +33,10 @@
 BuildRequires:  ghc-bytestring-devel
 BuildRequires:  ghc-conduit-devel
 BuildRequires:  ghc-conduit-extra-devel
+BuildRequires:  ghc-deepseq-devel
 BuildRequires:  ghc-dlist-devel
 BuildRequires:  ghc-exceptions-devel
+BuildRequires:  ghc-http-client-devel
 BuildRequires:  ghc-lens-devel
 BuildRequires:  ghc-lifted-async-devel
 BuildRequires:  ghc-mmap-devel
@@ -83,6 +85,6 @@
 
 %files devel -f %{name}-devel.files
 %defattr(-,root,root,-)
-%doc README.md
+%doc Changelog.md README.md
 
 %changelog

++++++ amazonka-s3-streaming-0.1.0.4.tar.gz -> 
amazonka-s3-streaming-0.2.0.3.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Changelog.md 
new/amazonka-s3-streaming-0.2.0.3/Changelog.md
--- old/amazonka-s3-streaming-0.1.0.4/Changelog.md      1970-01-01 
01:00:00.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/Changelog.md      2017-06-05 
05:40:29.000000000 +0200
@@ -0,0 +1,20 @@
+# Changelog - amazonka-s3-streaming
+
+# 0.2.0.3
+ * Make all library generated messages use Debug level not Info
+
+# 0.2.0.2
+ * Update to mmorph < 1.2
+
+## 0.2.0.1
+ * Fixed a bug with the printf format strings which would lead to a crash 
(Thanks @JakeOShannessy
+   for reporting).
+
+## 0.2.0.0
+ * Fixed a potential bug with very large uploads where the chunksize might be 
too small
+   for the limit of 10,000 chunks per upload (#6).
+ * Change API to allow the user to specify a chunk size for streaming if the 
user knows
+   more about the data than we do.
+ * Allow the user to specify how many concurrent threads to use for 
`concurrentUpload` as
+   as well as chunk size (#4).
+ * Better specify cabal dependency ranges.
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Main.hs 
new/amazonka-s3-streaming-0.2.0.3/Main.hs
--- old/amazonka-s3-streaming-0.1.0.4/Main.hs   2016-12-24 12:01:09.000000000 
+0100
+++ new/amazonka-s3-streaming-0.2.0.3/Main.hs   2017-02-13 10:34:19.000000000 
+0100
@@ -3,20 +3,21 @@
 module Main where
 
 
-import Data.Conduit        (($$))
-import Data.Conduit.Binary (sourceHandle)
-import Data.Text           (pack)
-
-import Network.AWS
-import Network.AWS.Data.Text                (fromText)
-import Network.AWS.S3.CreateMultipartUpload
-import Network.AWS.S3.StreamingUpload
-
-import System.Environment
-import System.IO          (BufferMode(BlockBuffering), hSetBuffering, stdin)
+import           Data.Conduit                         (($$))
+import           Data.Conduit.Binary                  (sourceHandle)
+import           Data.Text                            (pack)
+
+import           Network.AWS
+import           Network.AWS.Data.Text                (fromText)
+import           Network.AWS.S3.CreateMultipartUpload
+import           Network.AWS.S3.StreamingUpload
+
+import           System.Environment
+import           System.IO                            
(BufferMode(BlockBuffering),
+                                                       hSetBuffering, stdin)
 
 #if !MIN_VERSION_base(4,8,0)
-import Control.Applicative (pure, (<$>), (<*>))
+import           Control.Applicative                  (pure, (<$>), (<*>))
 #endif
 
 main :: IO ()
@@ -38,9 +39,9 @@
           hSetBuffering stdin (BlockBuffering Nothing)
           res <- runResourceT . runAWS env $ case file of
                   -- Stream data from stdin
-                  "-" -> sourceHandle stdin $$ streamUpload 
(createMultipartUpload buck ky)
+                  "-" -> sourceHandle stdin $$ streamUpload Nothing 
(createMultipartUpload buck ky)
                   -- Read from a file
-                  _   -> concurrentUpload (FP file) $ createMultipartUpload 
buck ky
+                  _   -> concurrentUpload Nothing Nothing (FP file) $ 
createMultipartUpload buck ky
           print res
         Left err -> print err >> usage
     ("abort":region:profile:credfile:bucket:_) ->
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/README.md 
new/amazonka-s3-streaming-0.2.0.3/README.md
--- old/amazonka-s3-streaming-0.1.0.4/README.md 2016-12-24 12:00:15.000000000 
+0100
+++ new/amazonka-s3-streaming-0.2.0.3/README.md 2017-05-24 04:37:45.000000000 
+0200
@@ -1,3 +1,5 @@
 # amazonka-s3-streaming [![Build 
Status](https://travis-ci.org/axman6/amazonka-s3-streaming.svg?branch=master)](https://travis-ci.org/axman6/amazonka-s3-streaming)
 
-Provides a conduit based streaming interface and a concurrent interface to 
uploading data to S3 using the Multipart API.
+Provides a conduit based streaming interface and a concurrent interface to 
uploading data to S3 using the Multipart API. Also provides method to upload 
files or bytestrings of known size in parallel.
+
+The documentation can be found on 
[Hackage](https://hackage.haskell.org/package/amazonka-s3-streaming).
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal 
new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal
--- old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal       
2016-12-24 12:29:54.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal       
2017-06-05 05:41:31.000000000 +0200
@@ -1,5 +1,5 @@
 name: amazonka-s3-streaming
-version: 0.1.0.4
+version: 0.2.0.3
 cabal-version: >=1.10
 build-type: Simple
 license: BSD3
@@ -14,6 +14,7 @@
 author: Alex Mason
 extra-source-files:
     README.md
+    Changelog.md
 
 source-repository head
     type: git
@@ -35,13 +36,15 @@
         resourcet >=1.1.7.4 && <1.2,
         conduit >=1.2.6.6 && <1.3,
         bytestring >=0.10.6.0 && <0.11,
-        mmorph >=1.0.6 && <1.1,
+        mmorph >=1.0.6 && <1.2,
         lens >=4.13 && <5.0,
         mtl >=2.2.1 && <2.3,
         exceptions >=0.8.2.1 && <0.9,
-        dlist >=0.8.0.2 && <0.9,
-        lifted-async >=0.9.0 && <0.10,
-        mmap >=0.5.9 && <0.6
+        dlist ==0.8.*,
+        lifted-async ==0.9.*,
+        mmap ==0.5.*,
+        deepseq ==1.4.*,
+        http-client >=0.4 && <0.6
     default-language: Haskell2010
     hs-source-dirs: src
 
@@ -52,11 +55,11 @@
         amazonka >=1.4.3 && <1.5,
         amazonka-core >=1.4.3 && <1.5,
         amazonka-s3 >=1.4.3 && <1.5,
-        amazonka-s3-streaming >=0.1.0.4 && <0.2,
+        amazonka-s3-streaming >=0.2.0.3 && <0.3,
         conduit-extra >=1.1.15 && <1.2,
         conduit >=1.2.8 && <1.3,
         bytestring >=0.10.8.1 && <0.11,
         text >=1.2.2.1 && <1.3
     default-language: Haskell2010
-    ghc-options: -threaded -rtsopts -with-rtsopts=-N
+    ghc-options: -threaded -rtsopts -with-rtsopts=-N -O2
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs 
new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs
--- old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs     
2016-12-24 11:33:43.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs     
2017-06-05 05:39:00.000000000 +0200
@@ -1,4 +1,5 @@
 {-# LANGUAGE BangPatterns          #-}
+{-# LANGUAGE CPP                   #-}
 {-# LANGUAGE ConstraintKinds       #-}
 {-# LANGUAGE FlexibleContexts      #-}
 {-# LANGUAGE MultiParamTypeClasses #-}
@@ -15,66 +16,72 @@
     , abortAllUploads
     , module Network.AWS.S3.CreateMultipartUpload
     , module Network.AWS.S3.CompleteMultipartUpload
-    , chunkSize
+    , minimumChunkSize
 ) where
 
-import Network.AWS
-       ( HasEnv(..)
-       , LogLevel(..)
-       , MonadAWS
-       , getFileSize
-       , hashedBody
-       , send
-       , toBody
-       )
-
-import Control.Monad.Trans.AWS (AWSConstraint)
-import Network.AWS.Data.Crypto
-       (Digest, SHA256, hashFinalize, hashInit, hashUpdate)
-
-import Network.AWS.S3.AbortMultipartUpload
-import Network.AWS.S3.CompleteMultipartUpload
-import Network.AWS.S3.CreateMultipartUpload
-import Network.AWS.S3.ListMultipartUploads
-import Network.AWS.S3.Types
-       ( BucketName
-       , cmuParts
-       , completedMultipartUpload
-       , completedPart
-       , muKey
-       , muUploadId
-       )
-import Network.AWS.S3.UploadPart
-
-import Control.Applicative
-import Control.Category             ((>>>))
-import Control.Monad                (forM_, when, (>=>))
-import Control.Monad.IO.Class       (MonadIO, liftIO)
-import Control.Monad.Morph          (lift)
-import Control.Monad.Trans.Resource (MonadBaseControl, MonadResource, throwM)
-
-import Data.Conduit      (Sink, await)
-import Data.Conduit.List (sourceList)
-
-import           Data.ByteString         (ByteString)
-import qualified Data.ByteString         as BS
-import           Data.ByteString.Builder (stringUtf8)
-import           System.IO.MMap          (mmapFileByteString)
-
-import qualified Data.DList         as D
-import           Data.List          (unfoldr)
-import           Data.List.NonEmpty (nonEmpty)
+import           Network.AWS                            (HasEnv(..),
+                                                         LogLevel(..), 
MonadAWS,
+                                                         getFileSize,
+                                                         hashedBody, send,
+                                                         toBody)
+
+import           Control.Monad.Trans.AWS                (AWSConstraint)
+import           Network.AWS.Data.Crypto                (Digest, SHA256,
+                                                         hashFinalize, 
hashInit,
+                                                         hashUpdate)
+
+import           Network.AWS.S3.AbortMultipartUpload
+import           Network.AWS.S3.CompleteMultipartUpload
+import           Network.AWS.S3.CreateMultipartUpload
+import           Network.AWS.S3.ListMultipartUploads
+import           Network.AWS.S3.Types                   (BucketName, cmuParts, 
completedMultipartUpload,
+                                                         completedPart, muKey,
+                                                         muUploadId)
+import           Network.AWS.S3.UploadPart
+
+import           Control.Applicative
+import           Control.Category                       ((>>>))
+import           Control.Monad                          (forM_, when, (>=>))
+import           Control.Monad.IO.Class                 (MonadIO, liftIO)
+import           Control.Monad.Morph                    (lift)
+import           Control.Monad.Reader.Class             (local)
+import           Control.Monad.Trans.Resource           (MonadBaseControl,
+                                                         MonadResource, throwM)
+
+import           Data.Conduit                           (Sink, await)
+import           Data.Conduit.List                      (sourceList)
+
+import           Data.ByteString                        (ByteString)
+import qualified Data.ByteString                        as BS
+import           Data.ByteString.Builder                (stringUtf8)
+import           System.IO.MMap                         (mmapFileByteString)
+
+import           Control.DeepSeq                        (rnf)
+import qualified Data.DList                             as D
+import           Data.List                              (unfoldr)
+import           Data.List.NonEmpty                     (nonEmpty)
+
+import           Control.Exception.Lens                 (catching, handling)
+import           Control.Lens
+
+import           Text.Printf                            (printf)
+
+import           Control.Concurrent                     (newQSem, signalQSem,
+                                                         waitQSem)
+import           Control.Concurrent.Async.Lifted        (forConcurrently)
+import           System.Mem                             (performGC)
+
+import           Network.HTTP.Client                    
(defaultManagerSettings,
+                                                         managerConnCount,
+                                                         newManager)
+import           Network.HTTP.Client.Internal           (mMaxConns)
 
-import Control.Exception.Lens (catching, handling)
-import Control.Lens
-
-import Text.Printf (printf)
-
-import Control.Concurrent.Async.Lifted (forConcurrently)
+type ChunkSize = Int
+type NumThreads = Int
 
 -- | Minimum size of data which will be sent in a single part, currently 6MB
-chunkSize :: Int
-chunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to complain
+minimumChunkSize :: ChunkSize
+minimumChunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to 
complain
 
 
 {- |
@@ -91,15 +98,17 @@
 
 May throw 'Network.AWS.Error'
 -}
-streamUpload :: (MonadResource m, MonadAWS m, AWSConstraint r m)
-             => CreateMultipartUpload
+streamUpload :: (MonadResource m, AWSConstraint r m, MonadAWS m)
+             => Maybe ChunkSize -- ^ Optional chunk size
+             -> CreateMultipartUpload -- ^ Upload location
              -> Sink ByteString m CompleteMultipartUploadResponse
-streamUpload cmu = do
+streamUpload mcs cmu = do
   logger <- lift $ view envLogger
   let logStr :: MonadIO m => String -> m ()
-      logStr = liftIO . logger Info . stringUtf8
+      logStr = liftIO . logger Debug . stringUtf8
+      chunkSize = maybe minimumChunkSize (max minimumChunkSize) mcs
 
-  cmur <- lift (send cmu)
+  cmur <- lift $ send cmu
   when (cmur ^. cmursResponseStatus /= 200) $
     fail "Failed to create upload"
 
@@ -119,18 +128,26 @@
                                               (hashFinalize $ hashUpdate ctx 
bs)
                                               (D.snoc bss bs)
 
-                    logStr $ printf "\n**** Uploaded part %d size $d\n" 
partnum bufsize
-
+                    logStr $ printf "\n**** Uploaded part %d size %d\n" 
partnum bufsize
                     let part = completedPart partnum <$> (rs ^. uprsETag)
-                    go empty 0 hashInit (partnum+1) $ D.snoc completed part
+#if MIN_VERSION_amazonka_s3(1,4,1)
+                        !_ = rnf part
+#endif
+                    liftIO performGC
+                    go empty 0 hashInit (partnum+1) . D.snoc completed $! part
 
         Nothing -> lift $ do
-            rs <- partUploader partnum bufsize (hashFinalize ctx) bss
-
-            logStr $ printf "\n**** Uploaded (final) part %d size $d\n" 
partnum bufsize
-
-            let allParts = D.toList $ D.snoc completed $ completedPart partnum 
<$> (rs ^. uprsETag)
-                prts = nonEmpty =<< sequence allParts
+            prts <- if bufsize > 0
+                then do
+                    rs <- partUploader partnum bufsize (hashFinalize ctx) bss
+
+                    logStr $ printf "\n**** Uploaded (final) part %d size 
%d\n" partnum bufsize
+
+                    let allParts = D.toList $ D.snoc completed $ completedPart 
partnum <$> (rs ^. uprsETag)
+                    pure $ nonEmpty =<< sequence allParts
+                else do
+                    logStr $ printf "\n**** No final data to upload\n"
+                    pure $ nonEmpty =<< sequence (D.toList completed)
 
             send $ completeMultipartUpload bucket key upId
                     & cMultipartUpload ?~ set cmuParts prts 
completedMultipartUpload
@@ -167,37 +184,68 @@
 
 {-|
 Allows a file or 'ByteString' to be uploaded concurrently, using the
-async library.  'ByteString's are split into 'chunkSize' chunks
-and uploaded directly.
+async library.  The chunk size may optionally be specified, but will be at 
least
+`minimumChunkSize`, and may be made larger than if the `ByteString` or file
+is larger enough to cause more than 10,000 chunks.
 
 Files are mmapped into 'chunkSize' chunks and each chunk is uploaded in 
parallel.
 This considerably reduces the memory necessary compared to reading the contents
 into memory as a strict 'ByteString'. The usual caveats about mmaped files 
apply:
 if the file is modified during this operation, the data may become corrupt.
 
-May throw `Network.AWS.Error`, or `IOError`.
+May throw `Network.AWS.Error`, or `IOError`; an attempt is made to cancel the
+multipart upload on any error, but this may also fail if, for example, the 
network
+connection has been broken. See `abortAllUploads` for a crude cleanup method.
 -}
-concurrentUpload :: (MonadAWS m, MonadBaseControl IO m)
-                 => UploadLocation -> CreateMultipartUpload -> m 
CompleteMultipartUploadResponse
-concurrentUpload ud cmu = do
+concurrentUpload :: (AWSConstraint r m, MonadAWS m, MonadBaseControl IO m)
+                 => Maybe ChunkSize -- ^ Optional chunk size
+                 -> Maybe NumThreads -- ^ Optional number of threads to upload 
with
+                 -> UploadLocation -- ^ Whether to upload a file on disk or a 
`ByteString` that's already in memory.
+                 -> CreateMultipartUpload -- ^ Description of where to upload.
+                 -> m CompleteMultipartUploadResponse
+concurrentUpload mcs mnt ud cmu = do
     cmur <- send cmu
     when (cmur ^. cmursResponseStatus /= 200) $
         fail "Failed to create upload"
+    logger <- view envLogger
+    let logStr :: MonadIO m => String -> m ()
+        logStr = liftIO . logger Info . stringUtf8
     let Just upId = cmur ^. cmursUploadId
         bucket    = cmu  ^. cmuBucket
         key       = cmu  ^. cmuKey
+
+        calcChunkSize :: Int -> Int
+        calcChunkSize len =
+            let chunkSize' = maybe minimumChunkSize (max minimumChunkSize) mcs
+            in if len `div` chunkSize' >= 10000 then len `div` 9999 else 
chunkSize'
+
         -- hndlr :: SomeException -> m CompleteMultipartUploadResponse
         hndlr e = send (abortMultipartUpload bucket key upId) >> throwM e
-
-    handling id hndlr $ do
+    mgr <- view envManager
+    let mConnCount = mMaxConns mgr
+        nThreads = maybe mConnCount (max 1) mnt
+        exec run = if maybe False (> mConnCount) mnt
+                then do
+                    mgr' <- liftIO $ newManager  
defaultManagerSettings{managerConnCount = nThreads}
+                    local (envManager .~ mgr') run
+                else run
+    exec $ handling id hndlr $ do
+        sem <- liftIO $ newQSem nThreads
         umrs <- case ud of
-            BS bs -> forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ 
\(partnum, b) -> do
-                    umr <- send . uploadPart bucket key partnum upId . toBody 
$ b
-                    pure $ completedPart partnum <$> (umr ^. uprsETag)
+            BS bs ->
+                    let chunkSize = calcChunkSize $ BS.length bs
+                    in forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ 
\(partnum, b) -> do
+                        liftIO $ waitQSem sem
+                        logStr $ "Starting part: " ++ show partnum
+                        umr <- send . uploadPart bucket key partnum upId . 
toBody $ b
+                        logStr $ "Finished part: " ++ show partnum
+                        liftIO $ signalQSem sem
+                        pure $ completedPart partnum <$> (umr ^. uprsETag)
 
             FP fp -> do
                 fsize <- liftIO $ getFileSize fp
-                let (count,lst) = divMod (fromIntegral fsize) chunkSize
+                let chunkSize = calcChunkSize $ fromIntegral fsize
+                    (count,lst) = divMod (fromIntegral fsize) chunkSize
                     params = [(partnum, chunkSize*offset, size)
                             | partnum <- [1..]
                             | offset  <- [0..count]
@@ -205,8 +253,10 @@
                             ]
 
                 forConcurrently params $ \(partnum,off,size) -> do
+                    liftIO $ waitQSem sem
                     b <- liftIO $ mmapFileByteString fp (Just (fromIntegral 
off,size))
                     umr <- send . uploadPart bucket key partnum upId . toBody 
$ b
+                    liftIO $ signalQSem sem
                     pure $ completedPart partnum <$> (umr ^. uprsETag)
 
         let prts = nonEmpty =<< sequence umrs


Reply via email to