Hello community, here is the log from the commit of package ghc-amqp for openSUSE:Factory checked in at 2017-08-31 20:50:04 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/ghc-amqp (Old) and /work/SRC/openSUSE:Factory/.ghc-amqp.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "ghc-amqp" Thu Aug 31 20:50:04 2017 rev:2 rq:513201 version:0.15.1 Changes: -------- --- /work/SRC/openSUSE:Factory/ghc-amqp/ghc-amqp.changes 2017-03-28 15:20:27.988485767 +0200 +++ /work/SRC/openSUSE:Factory/.ghc-amqp.new/ghc-amqp.changes 2017-08-31 20:50:05.897057273 +0200 @@ -1,0 +2,5 @@ +Thu Jul 27 14:06:00 UTC 2017 - psim...@suse.com + +- Update to version 0.15.1. + +------------------------------------------------------------------- Old: ---- amqp-0.14.1.tar.gz New: ---- amqp-0.15.1.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ ghc-amqp.spec ++++++ --- /var/tmp/diff_new_pack.RHILrx/_old 2017-08-31 20:50:06.764935449 +0200 +++ /var/tmp/diff_new_pack.RHILrx/_new 2017-08-31 20:50:06.772934326 +0200 @@ -19,7 +19,7 @@ %global pkg_name amqp %bcond_with tests Name: ghc-%{pkg_name} -Version: 0.14.1 +Version: 0.15.1 Release: 0 Summary: Client library for AMQP servers (currently only RabbitMQ) License: MIT ++++++ amqp-0.14.1.tar.gz -> amqp-0.15.1.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amqp-0.14.1/Network/AMQP/Internal.hs new/amqp-0.15.1/Network/AMQP/Internal.hs --- old/amqp-0.14.1/Network/AMQP/Internal.hs 2017-01-12 10:35:15.000000000 +0100 +++ new/amqp-0.15.1/Network/AMQP/Internal.hs 2017-06-30 14:14:47.000000000 +0200 @@ -278,7 +278,7 @@ -- connection: if the thread died for an unexpected exception, -- inform the channel threads downstream accordingly. Otherwise -- just use a normal 'killThread' finaliser. - let finaliser = case res of + let finaliser = ChanThreadKilledException $ case res of Left ex -> ex Right _ -> CE.toException CE.ThreadKilled modifyMVar_ cChannels $ \x -> do @@ -486,6 +486,16 @@ chanExceptionHandlers :: MVar [CE.SomeException -> IO ()] } +-- | Thrown in the channel thread when the connection gets closed. +-- When handling exceptions in a subscription callback, make sure to re-throw this so the channel thread can be stopped. +data ChanThreadKilledException = ChanThreadKilledException { cause :: CE.SomeException } + deriving (Show) + +instance CE.Exception ChanThreadKilledException + +unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException +unwrapChanThreadKilledException e = maybe e cause $ CE.fromException e + msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> Message msgFromContentHeaderProperties (CHBasic content_type content_encoding headers delivery_mode priority correlation_id reply_to expiration message_id timestamp message_type user_id application_id cluster_id) body = let msgId = fromShortString message_id @@ -542,8 +552,11 @@ let env = Envelope {envDeliveryTag = deliveryTag, envRedelivered = redelivered, envExchangeName = exchange, envRoutingKey = routingKey, envChannel = chan} - CE.catch (subscriber (msg, env)) - (\(e::CE.SomeException) -> hPutStrLn stderr $ "AMQP callback threw exception: " ++ show e) + CE.catches (subscriber (msg, env)) + [ + CE.Handler (\(e::ChanThreadKilledException) -> CE.throwIO $ cause e), + CE.Handler (\(e::CE.SomeException) -> hPutStrLn stderr $ "AMQP callback threw exception: " ++ show e) + ] Nothing -> -- got a message, but have no registered subscriber; so drop it return () @@ -656,7 +669,7 @@ closeChannel' newChannel "closed" case res of Right _ -> return () - Left ex -> readMVar handlers >>= mapM_ ($ ex) + Left ex -> readMVar handlers >>= mapM_ ($ unwrapChanThreadKilledException ex) when (IM.member newChannelID mp) $ CE.throwIO $ userError "openChannel fail: channel already open" return (IM.insert newChannelID (newChannel, thrID) mp, newChannel) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amqp-0.14.1/Network/AMQP.hs new/amqp-0.15.1/Network/AMQP.hs --- old/amqp-0.14.1/Network/AMQP.hs 2017-01-12 10:35:15.000000000 +0100 +++ new/amqp-0.15.1/Network/AMQP.hs 2017-06-30 14:14:47.000000000 +0200 @@ -119,7 +119,8 @@ waitForConfirmsUntil, addConfirmationListener, ConfirmationResult(..), - + AckType(..), + -- * Flow Control flow, @@ -131,6 +132,7 @@ -- * Exceptions AMQPException(..), + ChanThreadKilledException, -- * URI parsing fromURI @@ -353,6 +355,8 @@ -- | @consumeMsgs chan queue ack callback@ subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If @ack == 'Ack'@ you will have to acknowledge all incoming messages (see 'ackMsg' and 'ackEnv') -- +-- If you do any exception handling inside the callback, you should make sure not to catch 'ChanThreadKilledException', or re-throw it if you did catch it, since it is used internally by the library to close channels. +-- -- NOTE: The callback will be run on the same thread as the channel thread (every channel spawns its own thread to listen for incoming data) so DO NOT perform any request on @chan@ inside the callback (however, you CAN perform requests on other open channels inside the callback, though I wouldn't recommend it). -- Functions that can safely be called on @chan@ are 'ackMsg', 'ackEnv', 'rejectMsg', 'recoverMsgs'. If you want to perform anything more complex, it's a good idea to wrap it inside 'forkIO'. consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amqp-0.14.1/amqp.cabal new/amqp-0.15.1/amqp.cabal --- old/amqp-0.14.1/amqp.cabal 2017-01-12 10:35:15.000000000 +0100 +++ new/amqp-0.15.1/amqp.cabal 2017-06-30 14:14:47.000000000 +0200 @@ -1,5 +1,5 @@ Name: amqp -Version: 0.14.1 +Version: 0.15.1 Synopsis: Client library for AMQP servers (currently only RabbitMQ) Description: Client library for AMQP servers (currently only RabbitMQ) License: BSD3 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amqp-0.14.1/changelog.md new/amqp-0.15.1/changelog.md --- old/amqp-0.14.1/changelog.md 2017-01-12 10:35:15.000000000 +0100 +++ new/amqp-0.15.1/changelog.md 2017-06-30 14:14:47.000000000 +0200 @@ -1,3 +1,11 @@ +### Version 0.15.1 + +* export the `AckType` data-type and constructors + +### Version 0.15.0 + +* The way channels are closed internally was changed. This may affect you if you have installed an exception handler inside the callback passed to `consumeMsgs`. Specifically, the exceptions used internally to close channels are now wrapped inside `ChanThreadKilledException`. You should make sure to re-throw this exception if you did catch it. + ### Version 0.14.1 * show all exceptions if no host can be connected to diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/amqp-0.14.1/test/BasicPublishSpec.hs new/amqp-0.15.1/test/BasicPublishSpec.hs --- old/amqp-0.14.1/test/BasicPublishSpec.hs 2017-01-12 10:35:15.000000000 +0100 +++ new/amqp-0.15.1/test/BasicPublishSpec.hs 2017-06-30 14:14:47.000000000 +0200 @@ -6,8 +6,12 @@ import Network.AMQP import Data.ByteString.Lazy.Char8 as BL +import Data.Map (Map) +import qualified Data.Map as Map +import Data.Word import Control.Concurrent (threadDelay) +import Control.Concurrent.STM main :: IO () main = hspec spec @@ -60,3 +64,66 @@ _ <- deleteQueue ch q closeConnection conn + context "confirmSelect" $ do + it "receives a confirmation message" $ do + let q = "haskell-amqp.queues.publish-over-fanout1" + e = "haskell-amqp.fanout.d.na" + conn <- openConnection "127.0.0.1" "/" "guest" "guest" + ch <- openChannel conn + confirmSelect ch True + (confirmMap, counter) <- atomically $ (,) <$> newTVar Map.empty <*> newTVar 0 + addConfirmationListener ch (handleConfirms counter confirmMap) + _ <- declareExchange ch (newExchange {exchangeName = e, + exchangeType = "fanout", + exchangeDurable = True}) + + (_, _, _) <- declareQueue ch (newQueue {queueName = q, queueDurable = False}) + _ <- purgeQueue ch q + bindQueue ch q e "" + + + _ <- traverse (\n -> do + sn' <- publishMsg ch e "" (newMsg {msgBody = (BL.pack "hello")}) + case sn' of + Just sn -> atomically $ addSequenceNumber confirmMap (fromIntegral sn) n + Nothing -> return () + + ) [1..5] + + + threadDelay (1000 * 100) + + (_, n, _) <- declareQueue ch (newQueue {queueName = q, queueDurable = False}) + n `shouldBe` 5 + + cMap' <- atomically $ readTVar confirmMap + cMap' `shouldBe` Map.empty + + counter' <- atomically $ readTVar counter + counter' `shouldBe` 5 + + _ <- deleteQueue ch q + closeConnection conn + + +addSequenceNumber :: TVar (Map Word64 Int) -> Word64 -> Int -> STM () +addSequenceNumber cMap sn n = modifyTVar' cMap (Map.insert sn n) + +removeSequenceNumber :: TVar (Map Word64 Int) -> Word64 -> STM () +removeSequenceNumber cMap sn = modifyTVar' cMap (Map.delete sn) + +increaseCounter :: TVar Int -> STM () +increaseCounter n = modifyTVar' n (+1) + +handleConfirms :: TVar Int -> TVar (Map Word64 Int) -> (Word64, Bool, AckType) -> IO () +handleConfirms c _ (_, False, BasicNack) = atomically $ increaseCounter c +handleConfirms c _ (_, True, BasicNack) = atomically $ increaseCounter c +handleConfirms c cMap (n, False, BasicAck) = atomically $ removeSequenceNumber cMap n >> increaseCounter c +handleConfirms c cMap (n, True, BasicAck) = atomically $ do + cMap' <- readTVar cMap + let (lt, eq', _) = Map.splitLookup n cMap' + case eq' of + Just _ -> removeSequenceNumber cMap n >> increaseCounter c + Nothing -> return () + _ <- traverse (\i -> removeSequenceNumber cMap i >> increaseCounter c) (Map.keys lt) + return ()