Hello community,
here is the log from the commit of package ghc-amazonka-s3-streaming for
openSUSE:Factory checked in at 2017-08-31 20:50:02
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming (Old)
and /work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "ghc-amazonka-s3-streaming"
Thu Aug 31 20:50:02 2017 rev:3 rq:513200 version:0.2.0.3
Changes:
--------
---
/work/SRC/openSUSE:Factory/ghc-amazonka-s3-streaming/ghc-amazonka-s3-streaming.changes
2017-07-28 09:47:55.153927102 +0200
+++
/work/SRC/openSUSE:Factory/.ghc-amazonka-s3-streaming.new/ghc-amazonka-s3-streaming.changes
2017-08-31 20:50:04.125305973 +0200
@@ -1,0 +2,5 @@
+Thu Jul 27 14:07:03 UTC 2017 - [email protected]
+
+- Update to version 0.2.0.3.
+
+-------------------------------------------------------------------
Old:
----
amazonka-s3-streaming-0.1.0.4.tar.gz
New:
----
amazonka-s3-streaming-0.2.0.3.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ ghc-amazonka-s3-streaming.spec ++++++
--- /var/tmp/diff_new_pack.oYwQwi/_old 2017-08-31 20:50:05.081171798 +0200
+++ /var/tmp/diff_new_pack.oYwQwi/_new 2017-08-31 20:50:05.093170114 +0200
@@ -18,7 +18,7 @@
%global pkg_name amazonka-s3-streaming
Name: ghc-%{pkg_name}
-Version: 0.1.0.4
+Version: 0.2.0.3
Release: 0
Summary: Provides conduits to upload data to S3 using the Multipart API
License: MIT
@@ -33,8 +33,10 @@
BuildRequires: ghc-bytestring-devel
BuildRequires: ghc-conduit-devel
BuildRequires: ghc-conduit-extra-devel
+BuildRequires: ghc-deepseq-devel
BuildRequires: ghc-dlist-devel
BuildRequires: ghc-exceptions-devel
+BuildRequires: ghc-http-client-devel
BuildRequires: ghc-lens-devel
BuildRequires: ghc-lifted-async-devel
BuildRequires: ghc-mmap-devel
@@ -83,6 +85,6 @@
%files devel -f %{name}-devel.files
%defattr(-,root,root,-)
-%doc README.md
+%doc Changelog.md README.md
%changelog
++++++ amazonka-s3-streaming-0.1.0.4.tar.gz ->
amazonka-s3-streaming-0.2.0.3.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Changelog.md
new/amazonka-s3-streaming-0.2.0.3/Changelog.md
--- old/amazonka-s3-streaming-0.1.0.4/Changelog.md 1970-01-01
01:00:00.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/Changelog.md 2017-06-05
05:40:29.000000000 +0200
@@ -0,0 +1,20 @@
+# Changelog - amazonka-s3-streaming
+
+# 0.2.0.3
+ * Make all library generated messages use Debug level not Info
+
+# 0.2.0.2
+ * Update to mmorph < 1.2
+
+## 0.2.0.1
+ * Fixed a bug with the printf format strings which would lead to a crash
(Thanks @JakeOShannessy
+ for reporting).
+
+## 0.2.0.0
+ * Fixed a potential bug with very large uploads where the chunksize might be
too small
+ for the limit of 10,000 chunks per upload (#6).
+ * Change API to allow the user to specify a chunk size for streaming if the
user knows
+ more about the data than we do.
+ * Allow the user to specify how many concurrent threads to use for
`concurrentUpload` as
+ as well as chunk size (#4).
+ * Better specify cabal dependency ranges.
\ No newline at end of file
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/Main.hs
new/amazonka-s3-streaming-0.2.0.3/Main.hs
--- old/amazonka-s3-streaming-0.1.0.4/Main.hs 2016-12-24 12:01:09.000000000
+0100
+++ new/amazonka-s3-streaming-0.2.0.3/Main.hs 2017-02-13 10:34:19.000000000
+0100
@@ -3,20 +3,21 @@
module Main where
-import Data.Conduit (($$))
-import Data.Conduit.Binary (sourceHandle)
-import Data.Text (pack)
-
-import Network.AWS
-import Network.AWS.Data.Text (fromText)
-import Network.AWS.S3.CreateMultipartUpload
-import Network.AWS.S3.StreamingUpload
-
-import System.Environment
-import System.IO (BufferMode(BlockBuffering), hSetBuffering, stdin)
+import Data.Conduit (($$))
+import Data.Conduit.Binary (sourceHandle)
+import Data.Text (pack)
+
+import Network.AWS
+import Network.AWS.Data.Text (fromText)
+import Network.AWS.S3.CreateMultipartUpload
+import Network.AWS.S3.StreamingUpload
+
+import System.Environment
+import System.IO
(BufferMode(BlockBuffering),
+ hSetBuffering, stdin)
#if !MIN_VERSION_base(4,8,0)
-import Control.Applicative (pure, (<$>), (<*>))
+import Control.Applicative (pure, (<$>), (<*>))
#endif
main :: IO ()
@@ -38,9 +39,9 @@
hSetBuffering stdin (BlockBuffering Nothing)
res <- runResourceT . runAWS env $ case file of
-- Stream data from stdin
- "-" -> sourceHandle stdin $$ streamUpload
(createMultipartUpload buck ky)
+ "-" -> sourceHandle stdin $$ streamUpload Nothing
(createMultipartUpload buck ky)
-- Read from a file
- _ -> concurrentUpload (FP file) $ createMultipartUpload
buck ky
+ _ -> concurrentUpload Nothing Nothing (FP file) $
createMultipartUpload buck ky
print res
Left err -> print err >> usage
("abort":region:profile:credfile:bucket:_) ->
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/amazonka-s3-streaming-0.1.0.4/README.md
new/amazonka-s3-streaming-0.2.0.3/README.md
--- old/amazonka-s3-streaming-0.1.0.4/README.md 2016-12-24 12:00:15.000000000
+0100
+++ new/amazonka-s3-streaming-0.2.0.3/README.md 2017-05-24 04:37:45.000000000
+0200
@@ -1,3 +1,5 @@
# amazonka-s3-streaming [](https://travis-ci.org/axman6/amazonka-s3-streaming)
-Provides a conduit based streaming interface and a concurrent interface to
uploading data to S3 using the Multipart API.
+Provides a conduit based streaming interface and a concurrent interface to
uploading data to S3 using the Multipart API. Also provides method to upload
files or bytestrings of known size in parallel.
+
+The documentation can be found on
[Hackage](https://hackage.haskell.org/package/amazonka-s3-streaming).
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal
new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal
--- old/amazonka-s3-streaming-0.1.0.4/amazonka-s3-streaming.cabal
2016-12-24 12:29:54.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/amazonka-s3-streaming.cabal
2017-06-05 05:41:31.000000000 +0200
@@ -1,5 +1,5 @@
name: amazonka-s3-streaming
-version: 0.1.0.4
+version: 0.2.0.3
cabal-version: >=1.10
build-type: Simple
license: BSD3
@@ -14,6 +14,7 @@
author: Alex Mason
extra-source-files:
README.md
+ Changelog.md
source-repository head
type: git
@@ -35,13 +36,15 @@
resourcet >=1.1.7.4 && <1.2,
conduit >=1.2.6.6 && <1.3,
bytestring >=0.10.6.0 && <0.11,
- mmorph >=1.0.6 && <1.1,
+ mmorph >=1.0.6 && <1.2,
lens >=4.13 && <5.0,
mtl >=2.2.1 && <2.3,
exceptions >=0.8.2.1 && <0.9,
- dlist >=0.8.0.2 && <0.9,
- lifted-async >=0.9.0 && <0.10,
- mmap >=0.5.9 && <0.6
+ dlist ==0.8.*,
+ lifted-async ==0.9.*,
+ mmap ==0.5.*,
+ deepseq ==1.4.*,
+ http-client >=0.4 && <0.6
default-language: Haskell2010
hs-source-dirs: src
@@ -52,11 +55,11 @@
amazonka >=1.4.3 && <1.5,
amazonka-core >=1.4.3 && <1.5,
amazonka-s3 >=1.4.3 && <1.5,
- amazonka-s3-streaming >=0.1.0.4 && <0.2,
+ amazonka-s3-streaming >=0.2.0.3 && <0.3,
conduit-extra >=1.1.15 && <1.2,
conduit >=1.2.8 && <1.3,
bytestring >=0.10.8.1 && <0.11,
text >=1.2.2.1 && <1.3
default-language: Haskell2010
- ghc-options: -threaded -rtsopts -with-rtsopts=-N
+ ghc-options: -threaded -rtsopts -with-rtsopts=-N -O2
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs
new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs
--- old/amazonka-s3-streaming-0.1.0.4/src/Network/AWS/S3/StreamingUpload.hs
2016-12-24 11:33:43.000000000 +0100
+++ new/amazonka-s3-streaming-0.2.0.3/src/Network/AWS/S3/StreamingUpload.hs
2017-06-05 05:39:00.000000000 +0200
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@@ -15,66 +16,72 @@
, abortAllUploads
, module Network.AWS.S3.CreateMultipartUpload
, module Network.AWS.S3.CompleteMultipartUpload
- , chunkSize
+ , minimumChunkSize
) where
-import Network.AWS
- ( HasEnv(..)
- , LogLevel(..)
- , MonadAWS
- , getFileSize
- , hashedBody
- , send
- , toBody
- )
-
-import Control.Monad.Trans.AWS (AWSConstraint)
-import Network.AWS.Data.Crypto
- (Digest, SHA256, hashFinalize, hashInit, hashUpdate)
-
-import Network.AWS.S3.AbortMultipartUpload
-import Network.AWS.S3.CompleteMultipartUpload
-import Network.AWS.S3.CreateMultipartUpload
-import Network.AWS.S3.ListMultipartUploads
-import Network.AWS.S3.Types
- ( BucketName
- , cmuParts
- , completedMultipartUpload
- , completedPart
- , muKey
- , muUploadId
- )
-import Network.AWS.S3.UploadPart
-
-import Control.Applicative
-import Control.Category ((>>>))
-import Control.Monad (forM_, when, (>=>))
-import Control.Monad.IO.Class (MonadIO, liftIO)
-import Control.Monad.Morph (lift)
-import Control.Monad.Trans.Resource (MonadBaseControl, MonadResource, throwM)
-
-import Data.Conduit (Sink, await)
-import Data.Conduit.List (sourceList)
-
-import Data.ByteString (ByteString)
-import qualified Data.ByteString as BS
-import Data.ByteString.Builder (stringUtf8)
-import System.IO.MMap (mmapFileByteString)
-
-import qualified Data.DList as D
-import Data.List (unfoldr)
-import Data.List.NonEmpty (nonEmpty)
+import Network.AWS (HasEnv(..),
+ LogLevel(..),
MonadAWS,
+ getFileSize,
+ hashedBody, send,
+ toBody)
+
+import Control.Monad.Trans.AWS (AWSConstraint)
+import Network.AWS.Data.Crypto (Digest, SHA256,
+ hashFinalize,
hashInit,
+ hashUpdate)
+
+import Network.AWS.S3.AbortMultipartUpload
+import Network.AWS.S3.CompleteMultipartUpload
+import Network.AWS.S3.CreateMultipartUpload
+import Network.AWS.S3.ListMultipartUploads
+import Network.AWS.S3.Types (BucketName, cmuParts,
completedMultipartUpload,
+ completedPart, muKey,
+ muUploadId)
+import Network.AWS.S3.UploadPart
+
+import Control.Applicative
+import Control.Category ((>>>))
+import Control.Monad (forM_, when, (>=>))
+import Control.Monad.IO.Class (MonadIO, liftIO)
+import Control.Monad.Morph (lift)
+import Control.Monad.Reader.Class (local)
+import Control.Monad.Trans.Resource (MonadBaseControl,
+ MonadResource, throwM)
+
+import Data.Conduit (Sink, await)
+import Data.Conduit.List (sourceList)
+
+import Data.ByteString (ByteString)
+import qualified Data.ByteString as BS
+import Data.ByteString.Builder (stringUtf8)
+import System.IO.MMap (mmapFileByteString)
+
+import Control.DeepSeq (rnf)
+import qualified Data.DList as D
+import Data.List (unfoldr)
+import Data.List.NonEmpty (nonEmpty)
+
+import Control.Exception.Lens (catching, handling)
+import Control.Lens
+
+import Text.Printf (printf)
+
+import Control.Concurrent (newQSem, signalQSem,
+ waitQSem)
+import Control.Concurrent.Async.Lifted (forConcurrently)
+import System.Mem (performGC)
+
+import Network.HTTP.Client
(defaultManagerSettings,
+ managerConnCount,
+ newManager)
+import Network.HTTP.Client.Internal (mMaxConns)
-import Control.Exception.Lens (catching, handling)
-import Control.Lens
-
-import Text.Printf (printf)
-
-import Control.Concurrent.Async.Lifted (forConcurrently)
+type ChunkSize = Int
+type NumThreads = Int
-- | Minimum size of data which will be sent in a single part, currently 6MB
-chunkSize :: Int
-chunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to complain
+minimumChunkSize :: ChunkSize
+minimumChunkSize = 6*1024*1024 -- Making this 5MB+1 seemed to cause AWS to
complain
{- |
@@ -91,15 +98,17 @@
May throw 'Network.AWS.Error'
-}
-streamUpload :: (MonadResource m, MonadAWS m, AWSConstraint r m)
- => CreateMultipartUpload
+streamUpload :: (MonadResource m, AWSConstraint r m, MonadAWS m)
+ => Maybe ChunkSize -- ^ Optional chunk size
+ -> CreateMultipartUpload -- ^ Upload location
-> Sink ByteString m CompleteMultipartUploadResponse
-streamUpload cmu = do
+streamUpload mcs cmu = do
logger <- lift $ view envLogger
let logStr :: MonadIO m => String -> m ()
- logStr = liftIO . logger Info . stringUtf8
+ logStr = liftIO . logger Debug . stringUtf8
+ chunkSize = maybe minimumChunkSize (max minimumChunkSize) mcs
- cmur <- lift (send cmu)
+ cmur <- lift $ send cmu
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
@@ -119,18 +128,26 @@
(hashFinalize $ hashUpdate ctx
bs)
(D.snoc bss bs)
- logStr $ printf "\n**** Uploaded part %d size $d\n"
partnum bufsize
-
+ logStr $ printf "\n**** Uploaded part %d size %d\n"
partnum bufsize
let part = completedPart partnum <$> (rs ^. uprsETag)
- go empty 0 hashInit (partnum+1) $ D.snoc completed part
+#if MIN_VERSION_amazonka_s3(1,4,1)
+ !_ = rnf part
+#endif
+ liftIO performGC
+ go empty 0 hashInit (partnum+1) . D.snoc completed $! part
Nothing -> lift $ do
- rs <- partUploader partnum bufsize (hashFinalize ctx) bss
-
- logStr $ printf "\n**** Uploaded (final) part %d size $d\n"
partnum bufsize
-
- let allParts = D.toList $ D.snoc completed $ completedPart partnum
<$> (rs ^. uprsETag)
- prts = nonEmpty =<< sequence allParts
+ prts <- if bufsize > 0
+ then do
+ rs <- partUploader partnum bufsize (hashFinalize ctx) bss
+
+ logStr $ printf "\n**** Uploaded (final) part %d size
%d\n" partnum bufsize
+
+ let allParts = D.toList $ D.snoc completed $ completedPart
partnum <$> (rs ^. uprsETag)
+ pure $ nonEmpty =<< sequence allParts
+ else do
+ logStr $ printf "\n**** No final data to upload\n"
+ pure $ nonEmpty =<< sequence (D.toList completed)
send $ completeMultipartUpload bucket key upId
& cMultipartUpload ?~ set cmuParts prts
completedMultipartUpload
@@ -167,37 +184,68 @@
{-|
Allows a file or 'ByteString' to be uploaded concurrently, using the
-async library. 'ByteString's are split into 'chunkSize' chunks
-and uploaded directly.
+async library. The chunk size may optionally be specified, but will be at
least
+`minimumChunkSize`, and may be made larger than if the `ByteString` or file
+is larger enough to cause more than 10,000 chunks.
Files are mmapped into 'chunkSize' chunks and each chunk is uploaded in
parallel.
This considerably reduces the memory necessary compared to reading the contents
into memory as a strict 'ByteString'. The usual caveats about mmaped files
apply:
if the file is modified during this operation, the data may become corrupt.
-May throw `Network.AWS.Error`, or `IOError`.
+May throw `Network.AWS.Error`, or `IOError`; an attempt is made to cancel the
+multipart upload on any error, but this may also fail if, for example, the
network
+connection has been broken. See `abortAllUploads` for a crude cleanup method.
-}
-concurrentUpload :: (MonadAWS m, MonadBaseControl IO m)
- => UploadLocation -> CreateMultipartUpload -> m
CompleteMultipartUploadResponse
-concurrentUpload ud cmu = do
+concurrentUpload :: (AWSConstraint r m, MonadAWS m, MonadBaseControl IO m)
+ => Maybe ChunkSize -- ^ Optional chunk size
+ -> Maybe NumThreads -- ^ Optional number of threads to upload
with
+ -> UploadLocation -- ^ Whether to upload a file on disk or a
`ByteString` that's already in memory.
+ -> CreateMultipartUpload -- ^ Description of where to upload.
+ -> m CompleteMultipartUploadResponse
+concurrentUpload mcs mnt ud cmu = do
cmur <- send cmu
when (cmur ^. cmursResponseStatus /= 200) $
fail "Failed to create upload"
+ logger <- view envLogger
+ let logStr :: MonadIO m => String -> m ()
+ logStr = liftIO . logger Info . stringUtf8
let Just upId = cmur ^. cmursUploadId
bucket = cmu ^. cmuBucket
key = cmu ^. cmuKey
+
+ calcChunkSize :: Int -> Int
+ calcChunkSize len =
+ let chunkSize' = maybe minimumChunkSize (max minimumChunkSize) mcs
+ in if len `div` chunkSize' >= 10000 then len `div` 9999 else
chunkSize'
+
-- hndlr :: SomeException -> m CompleteMultipartUploadResponse
hndlr e = send (abortMultipartUpload bucket key upId) >> throwM e
-
- handling id hndlr $ do
+ mgr <- view envManager
+ let mConnCount = mMaxConns mgr
+ nThreads = maybe mConnCount (max 1) mnt
+ exec run = if maybe False (> mConnCount) mnt
+ then do
+ mgr' <- liftIO $ newManager
defaultManagerSettings{managerConnCount = nThreads}
+ local (envManager .~ mgr') run
+ else run
+ exec $ handling id hndlr $ do
+ sem <- liftIO $ newQSem nThreads
umrs <- case ud of
- BS bs -> forConcurrently (zip [1..] $ chunksOf chunkSize bs) $
\(partnum, b) -> do
- umr <- send . uploadPart bucket key partnum upId . toBody
$ b
- pure $ completedPart partnum <$> (umr ^. uprsETag)
+ BS bs ->
+ let chunkSize = calcChunkSize $ BS.length bs
+ in forConcurrently (zip [1..] $ chunksOf chunkSize bs) $
\(partnum, b) -> do
+ liftIO $ waitQSem sem
+ logStr $ "Starting part: " ++ show partnum
+ umr <- send . uploadPart bucket key partnum upId .
toBody $ b
+ logStr $ "Finished part: " ++ show partnum
+ liftIO $ signalQSem sem
+ pure $ completedPart partnum <$> (umr ^. uprsETag)
FP fp -> do
fsize <- liftIO $ getFileSize fp
- let (count,lst) = divMod (fromIntegral fsize) chunkSize
+ let chunkSize = calcChunkSize $ fromIntegral fsize
+ (count,lst) = divMod (fromIntegral fsize) chunkSize
params = [(partnum, chunkSize*offset, size)
| partnum <- [1..]
| offset <- [0..count]
@@ -205,8 +253,10 @@
]
forConcurrently params $ \(partnum,off,size) -> do
+ liftIO $ waitQSem sem
b <- liftIO $ mmapFileByteString fp (Just (fromIntegral
off,size))
umr <- send . uploadPart bucket key partnum upId . toBody
$ b
+ liftIO $ signalQSem sem
pure $ completedPart partnum <$> (umr ^. uprsETag)
let prts = nonEmpty =<< sequence umrs