Hello community, here is the log from the commit of package ghc-amazonka-s3-streaming for openSUSE:Factory checked in at 2017-08-31 20:50:02 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming (Old) and /work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "ghc-amazonka-s3-streaming" Thu Aug 31 20:50:02 2017 rev:3 rq:513200 version:0.2.0.3 Changes: -------- --- /work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming/ghc-amazonka-s3-streaming.changes 2017-07-28 09:47:55.153927102 +0200 +++ /work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new/ghc-amazonka-s3-streaming.changes 2017-08-31 20:50:04.125305973 +0200 @@ -1,0 +2,5 @@ +Thu Jul 27 14:07:03 UTC 2017 - psim...@suse.com + +- Update to version 0.2.0.3. + +------------------------------------------------------------------- Old: ---- amazonka-s3-streaming-0.1.0.4.tar.gz New: ---- amazonka-s3-streaming-0.2.0.3.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ ghc-amazonka-s3-streaming.spec ++++++ --- /var/tmp/diff_new_pack.oYwQwi/_old 2017-08-31 20:50:05.081171798 +0200 +++ /var/tmp/diff_new_pack.oYwQwi/_new 2017-08-31 20:50:05.093170114 +0200 @@ -18,7 +18,7 @@ %global pkg_name amazonka-s3-streaming Name: ghc-%{pkg_name} -Version: 0.1.0.4 +Version: 0.2.0.3 Release: 0 Summary: Provides conduits to upload data to S3 using the Multipart API License: MIT @@ -33,8 +33,10 @@ BuildRequires: ghc-bytestring-devel BuildRequires: ghc-conduit-devel BuildRequires: ghc-conduit-extra-devel +BuildRequires: ghc-deepseq-devel BuildRequires: ghc-dlist-devel BuildRequires: ghc-exceptions-devel +BuildRequires: ghc-http-client-devel BuildRequires: ghc-lens-devel BuildRequires: ghc-lifted-async-devel BuildRequires: ghc-mmap-devel @@ -83,6 +85,6 @@ %files devel -f %{name}-devel.files %defattr(-,root,root,-) -%doc README.md +%doc Changelog.md README.md %changelog ++++++ amazonka-s3-streaming-0.1.0.4.tar.gz -> amazonka-s3-streaming-0.2.0.3.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Changelog.md new/amazonka-s3-streaming-0.2.0.3/Changelog.md --- old/amazonka-s3-streaming-0.1.0.4/Changelog.md 1970-01-01 01:00:00.000000000 +0100 +++ new/amazonka-s3-streaming-0.2.0.3/Changelog.md 2017-06-05 05:40:29.000000000 +0200 @@ -0,0 +1,20 @@ +# Changelog - amazonka-s3-streaming + +# 0.2.0.3 + * Make all library generated messages use Debug level not Info + +# 0.2.0.2 + * Update to mmorph < 1.2 + +## 0.2.0.1 + * Fixed a bug with the printf format strings which would lead to a crash (Thanks @JakeOShannessy + for reporting). + +## 0.2.0.0 + * Fixed a potential bug with very large uploads where the chunksize might be too small + for the limit of 10,000 chunks per upload (#6). + * Change API to allow the user to specify a chunk size for streaming if the user knows + more about the data than we do. + * Allow the user to specify how many concurrent threads to use for `concurrentUpload` as + as well as chunk size (#4). + * Better specify cabal dependency ranges. \ No newline at end of file diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Main.hs new/amazonka-s3-streaming-0.2.0.3/Main.hs --- old/amazonka-s3-streaming-0.1.0.4/Main.hs 2016-12-24 12:01:09.000000000 +0100 +++ new/amazonka-s3-streaming-0.2.0.3/Main.hs 2017-02-13 10:34:19.000000000 +0100 @@ -3,20 +3,21 @@ module Main where -import Data.Conduit (($$)) -import Data.Conduit.Binary (sourceHandle) -import Data.Text (pack) - -import Network.AWS -import Network.AWS.Data.Text (fromText) -import Network.AWS.S3.CreateMultipartUpload -import Network.AWS.S3.StreamingUpload - -import System.Environment -import System.IO (BufferMode(BlockBuffering), hSetBuffering, stdin) +import Data.Conduit (($$)) +import Data.Conduit.Binary (sourceHandle) +import Data.Text (pack) + +import Network.AWS +import Network.AWS.Data.Text (fromText) +import Network.AWS.S3.CreateMultipartUpload +import Network.AWS.S3.StreamingUpload + +import System.Environment +import System.IO (BufferMode(BlockBuffering), + hSetBuffering, stdin) #if !MIN_VERSION_base(4,8,0) -import Control.Applicative (pure, (<$>), (<*>)) +import Control.Applicative (pure, (<$>), (<*>)) #endif main :: IO () @@ -38,9 +39,9 @@ hSetBuffering stdin (BlockBuffering Nothing) res <- runResourceT . runAWS env $ case file of -- Stream data from stdin - "-" -> sourceHandle stdin $$ streamUpload (createMultipartUpload buck ky) + "-" -> sourceHandle stdin $$ streamUpload Nothing (createMultipartUpload buck ky) -- Read from a file - _ -> concurrentUpload (FP file) $ createMultipartUpload buck ky + _ -> concurrentUpload Nothing Nothing (FP file) $ createMultipartUpload buck ky print res Left err -> print err >> usage ("abort":region:profile:credfile:bucket:_) -> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/README.md new/amazonka-s3-streaming-0.2.0.3/README.md --- old/amazonka-s3-streaming-0.1.0.4/README.md 2016-12-24 12:00:15.000000000 +0100 +++ new/amazonka-s3-streaming-0.2.0.3/README.md 2017-05-24 04:37:45.000000000 +0200 @@ -1,3 +1,5 @@ # amazonka-s3-streaming [![Build Status](https://travis-ci.org/axman6/amazonka-s3-streaming.svg?branch=master)](https://travis-ci.org/axman6/amazonka-s3-streaming) -Provides a conduit based streaming interface and a concurrent interface to uploading data to S3 using the Multipart API. +Provides a conduit based streaming interface and a concurrent interface to uploading data to S3 using the Multipart API. Also provides method to upload files or bytestrings of known size in parallel. + +The documentation can be found on [Hackage](https://hackage.haskell.org/package/amazonka-s3-streaming). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal --- old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal 2016-12-24 12:29:54.000000000 +0100 +++ new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal 2017-06-05 05:41:31.000000000 +0200 @@ -1,5 +1,5 @@ name: amazonka-s3-streaming -version: 0.1.0.4 +version: 0.2.0.3 cabal-version: >=1.10 build-type: Simple license: BSD3 @@ -14,6 +14,7 @@ author: Alex Mason extra-source-files: README.md + Changelog.md source-repository head type: git @@ -35,13 +36,15 @@ resourcet >=1.1.7.4 && <1.2, conduit >=1.2.6.6 && <1.3, bytestring >=0.10.6.0 && <0.11, - mmorph >=1.0.6 && <1.1, + mmorph >=1.0.6 && <1.2, lens >=4.13 && <5.0, mtl >=2.2.1 && <2.3, exceptions >=0.8.2.1 && <0.9, - dlist >=0.8.0.2 && <0.9, - lifted-async >=0.9.0 && <0.10, - mmap >=0.5.9 && <0.6 + dlist ==0.8.*, + lifted-async ==0.9.*, + mmap ==0.5.*, + deepseq ==1.4.*, + http-client >=0.4 && <0.6 default-language: Haskell2010 hs-source-dirs: src @@ -52,11 +55,11 @@ amazonka >=1.4.3 && <1.5, amazonka-core >=1.4.3 && <1.5, amazonka-s3 >=1.4.3 && <1.5, - amazonka-s3-streaming >=0.1.0.4 && <0.2, + amazonka-s3-streaming >=0.2.0.3 && <0.3, conduit-extra >=1.1.15 && <1.2, conduit >=1.2.8 && <1.3, bytestring >=0.10.8.1 && <0.11, text >=1.2.2.1 && <1.3 default-language: Haskell2010 - ghc-options: -threaded -rtsopts -with-rtsopts=-N + ghc-options: -threaded -rtsopts -with-rtsopts=-N -O2 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs --- old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs 2016-12-24 11:33:43.000000000 +0100 +++ new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs 2017-06-05 05:39:00.000000000 +0200 @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -15,66 +16,72 @@ , abortAllUploads , module Network.AWS.S3.CreateMultipartUpload , module Network.AWS.S3.CompleteMultipartUpload - , chunkSize + , minimumChunkSize ) where -import Network.AWS - ( HasEnv(..) - , LogLevel(..) - , MonadAWS - , getFileSize - , hashedBody - , send - , toBody - ) - -import Control.Monad.Trans.AWS (AWSConstraint) -import Network.AWS.Data.Crypto - (Digest, SHA256, hashFinalize, hashInit, hashUpdate) - -import Network.AWS.S3.AbortMultipartUpload -import Network.AWS.S3.CompleteMultipartUpload -import Network.AWS.S3.CreateMultipartUpload -import Network.AWS.S3.ListMultipartUploads -import Network.AWS.S3.Types - ( BucketName - , cmuParts - , completedMultipartUpload - , completedPart - , muKey - , muUploadId - ) -import Network.AWS.S3.UploadPart - -import Control.Applicative -import Control.Category ((>>>)) -import Control.Monad (forM_, when, (>=>)) -import Control.Monad.IO.Class (MonadIO, liftIO) -import Control.Monad.Morph (lift) -import Control.Monad.Trans.Resource (MonadBaseControl, MonadResource, throwM) - -import Data.Conduit (Sink, await) -import Data.Conduit.List (sourceList) - -import Data.ByteString (ByteString) -import qualified Data.ByteString as BS -import Data.ByteString.Builder (stringUtf8) -import System.IO.MMap (mmapFileByteString) - -import qualified Data.DList as D -import Data.List (unfoldr) -import Data.List.NonEmpty (nonEmpty) +import Network.AWS (HasEnv(..), + LogLevel(..), MonadAWS, + getFileSize, + hashedBody, send, + toBody) + +import Control.Monad.Trans.AWS (AWSConstraint) +import Network.AWS.Data.Crypto (Digest, SHA256, + hashFinalize, hashInit, + hashUpdate) + +import Network.AWS.S3.AbortMultipartUpload +import Network.AWS.S3.CompleteMultipartUpload +import Network.AWS.S3.CreateMultipartUpload +import Network.AWS.S3.ListMultipartUploads +import Network.AWS.S3.Types (BucketName, cmuParts, completedMultipartUpload, + completedPart, muKey, + muUploadId) +import Network.AWS.S3.UploadPart + +import Control.Applicative +import Control.Category ((>>>)) +import Control.Monad (forM_, when, (>=>)) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Morph (lift) +import Control.Monad.Reader.Class (local) +import Control.Monad.Trans.Resource (MonadBaseControl, + MonadResource, throwM) + +import Data.Conduit (Sink, await) +import Data.Conduit.List (sourceList) + +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import Data.ByteString.Builder (stringUtf8) +import System.IO.MMap (mmapFileByteString) + +import Control.DeepSeq (rnf) +import qualified Data.DList as D +import Data.List (unfoldr) +import Data.List.NonEmpty (nonEmpty) + +import Control.Exception.Lens (catching, handling) +import Control.Lens + +import Text.Printf (printf) + +import Control.Concurrent (newQSem, signalQSem, + waitQSem) +import Control.Concurrent.Async.Lifted (forConcurrently) +import System.Mem (performGC) + +import Network.HTTP.Client (defaultManagerSettings, + managerConnCount, + newManager) +import Network.HTTP.Client.Internal (mMaxConns) -import Control.Exception.Lens (catching, handling) -import Control.Lens - -import Text.Printf (printf) - -import Control.Concurrent.Async.Lifted (forConcurrently) +type ChunkSize = Int +type NumThreads = Int -- | Minimum size of data which will be sent in a single part, currently 6MB -chunkSize :: Int -chunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to complain +minimumChunkSize :: ChunkSize +minimumChunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to complain {- | @@ -91,15 +98,17 @@ May throw 'Network.AWS.Error' -} -streamUpload :: (MonadResource m, MonadAWS m, AWSConstraint r m) - => CreateMultipartUpload +streamUpload :: (MonadResource m, AWSConstraint r m, MonadAWS m) + => Maybe ChunkSize -- ^ Optional chunk size + -> CreateMultipartUpload -- ^ Upload location -> Sink ByteString m CompleteMultipartUploadResponse -streamUpload cmu = do +streamUpload mcs cmu = do logger <- lift $ view envLogger let logStr :: MonadIO m => String -> m () - logStr = liftIO . logger Info . stringUtf8 + logStr = liftIO . logger Debug . stringUtf8 + chunkSize = maybe minimumChunkSize (max minimumChunkSize) mcs - cmur <- lift (send cmu) + cmur <- lift $ send cmu when (cmur ^. cmursResponseStatus /= 200) $ fail "Failed to create upload" @@ -119,18 +128,26 @@ (hashFinalize $ hashUpdate ctx bs) (D.snoc bss bs) - logStr $ printf "\n**** Uploaded part %d size $d\n" partnum bufsize - + logStr $ printf "\n**** Uploaded part %d size %d\n" partnum bufsize let part = completedPart partnum <$> (rs ^. uprsETag) - go empty 0 hashInit (partnum+1) $ D.snoc completed part +#if MIN_VERSION_amazonka_s3(1,4,1) + !_ = rnf part +#endif + liftIO performGC + go empty 0 hashInit (partnum+1) . D.snoc completed $! part Nothing -> lift $ do - rs <- partUploader partnum bufsize (hashFinalize ctx) bss - - logStr $ printf "\n**** Uploaded (final) part %d size $d\n" partnum bufsize - - let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag) - prts = nonEmpty =<< sequence allParts + prts <- if bufsize > 0 + then do + rs <- partUploader partnum bufsize (hashFinalize ctx) bss + + logStr $ printf "\n**** Uploaded (final) part %d size %d\n" partnum bufsize + + let allParts = D.toList $ D.snoc completed $ completedPart partnum <$> (rs ^. uprsETag) + pure $ nonEmpty =<< sequence allParts + else do + logStr $ printf "\n**** No final data to upload\n" + pure $ nonEmpty =<< sequence (D.toList completed) send $ completeMultipartUpload bucket key upId & cMultipartUpload ?~ set cmuParts prts completedMultipartUpload @@ -167,37 +184,68 @@ {-| Allows a file or 'ByteString' to be uploaded concurrently, using the -async library. 'ByteString's are split into 'chunkSize' chunks -and uploaded directly. +async library. The chunk size may optionally be specified, but will be at least +`minimumChunkSize`, and may be made larger than if the `ByteString` or file +is larger enough to cause more than 10,000 chunks. Files are mmapped into 'chunkSize' chunks and each chunk is uploaded in parallel. This considerably reduces the memory necessary compared to reading the contents into memory as a strict 'ByteString'. The usual caveats about mmaped files apply: if the file is modified during this operation, the data may become corrupt. -May throw `Network.AWS.Error`, or `IOError`. +May throw `Network.AWS.Error`, or `IOError`; an attempt is made to cancel the +multipart upload on any error, but this may also fail if, for example, the network +connection has been broken. See `abortAllUploads` for a crude cleanup method. -} -concurrentUpload :: (MonadAWS m, MonadBaseControl IO m) - => UploadLocation -> CreateMultipartUpload -> m CompleteMultipartUploadResponse -concurrentUpload ud cmu = do +concurrentUpload :: (AWSConstraint r m, MonadAWS m, MonadBaseControl IO m) + => Maybe ChunkSize -- ^ Optional chunk size + -> Maybe NumThreads -- ^ Optional number of threads to upload with + -> UploadLocation -- ^ Whether to upload a file on disk or a `ByteString` that's already in memory. + -> CreateMultipartUpload -- ^ Description of where to upload. + -> m CompleteMultipartUploadResponse +concurrentUpload mcs mnt ud cmu = do cmur <- send cmu when (cmur ^. cmursResponseStatus /= 200) $ fail "Failed to create upload" + logger <- view envLogger + let logStr :: MonadIO m => String -> m () + logStr = liftIO . logger Info . stringUtf8 let Just upId = cmur ^. cmursUploadId bucket = cmu ^. cmuBucket key = cmu ^. cmuKey + + calcChunkSize :: Int -> Int + calcChunkSize len = + let chunkSize' = maybe minimumChunkSize (max minimumChunkSize) mcs + in if len `div` chunkSize' >= 10000 then len `div` 9999 else chunkSize' + -- hndlr :: SomeException -> m CompleteMultipartUploadResponse hndlr e = send (abortMultipartUpload bucket key upId) >> throwM e - - handling id hndlr $ do + mgr <- view envManager + let mConnCount = mMaxConns mgr + nThreads = maybe mConnCount (max 1) mnt + exec run = if maybe False (> mConnCount) mnt + then do + mgr' <- liftIO $ newManager defaultManagerSettings{managerConnCount = nThreads} + local (envManager .~ mgr') run + else run + exec $ handling id hndlr $ do + sem <- liftIO $ newQSem nThreads umrs <- case ud of - BS bs -> forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ \(partnum, b) -> do - umr <- send . uploadPart bucket key partnum upId . toBody $ b - pure $ completedPart partnum <$> (umr ^. uprsETag) + BS bs -> + let chunkSize = calcChunkSize $ BS.length bs + in forConcurrently (zip [1..] $ chunksOf chunkSize bs) $ \(partnum, b) -> do + liftIO $ waitQSem sem + logStr $ "Starting part: " ++ show partnum + umr <- send . uploadPart bucket key partnum upId . toBody $ b + logStr $ "Finished part: " ++ show partnum + liftIO $ signalQSem sem + pure $ completedPart partnum <$> (umr ^. uprsETag) FP fp -> do fsize <- liftIO $ getFileSize fp - let (count,lst) = divMod (fromIntegral fsize) chunkSize + let chunkSize = calcChunkSize $ fromIntegral fsize + (count,lst) = divMod (fromIntegral fsize) chunkSize params = [(partnum, chunkSize*offset, size) | partnum <- [1..] | offset <- [0..count] @@ -205,8 +253,10 @@ ] forConcurrently params $ \(partnum,off,size) -> do + liftIO $ waitQSem sem b <- liftIO $ mmapFileByteString fp (Just (fromIntegral off,size)) umr <- send . uploadPart bucket key partnum upId . toBody $ b + liftIO $ signalQSem sem pure $ completedPart partnum <$> (umr ^. uprsETag) let prts = nonEmpty =<< sequence umrs