Hello community,

here is the log from the commit of package ghc-amqp for openSUSE:Factory 
checked in at 2017-08-31 20:50:04
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/ghc-amqp (Old)
 and      /work/SRC/openSUSE:Factory/.ghc-amqp.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "ghc-amqp"

Thu Aug 31 20:50:04 2017 rev:2 rq:513201 version:0.15.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/ghc-amqp/ghc-amqp.changes        2017-03-28 
15:20:27.988485767 +0200
+++ /work/SRC/openSUSE:Factory/.ghc-amqp.new/ghc-amqp.changes   2017-08-31 
20:50:05.897057273 +0200
@@ -1,0 +2,5 @@
+Thu Jul 27 14:06:00 UTC 2017 - psim...@suse.com
+
+- Update to version 0.15.1.
+
+-------------------------------------------------------------------

Old:
----
  amqp-0.14.1.tar.gz

New:
----
  amqp-0.15.1.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ ghc-amqp.spec ++++++
--- /var/tmp/diff_new_pack.RHILrx/_old  2017-08-31 20:50:06.764935449 +0200
+++ /var/tmp/diff_new_pack.RHILrx/_new  2017-08-31 20:50:06.772934326 +0200
@@ -19,7 +19,7 @@
 %global pkg_name amqp
 %bcond_with tests
 Name:           ghc-%{pkg_name}
-Version:        0.14.1
+Version:        0.15.1
 Release:        0
 Summary:        Client library for AMQP servers (currently only RabbitMQ)
 License:        MIT

++++++ amqp-0.14.1.tar.gz -> amqp-0.15.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amqp-0.14.1/Network/AMQP/Internal.hs 
new/amqp-0.15.1/Network/AMQP/Internal.hs
--- old/amqp-0.14.1/Network/AMQP/Internal.hs    2017-01-12 10:35:15.000000000 
+0100
+++ new/amqp-0.15.1/Network/AMQP/Internal.hs    2017-06-30 14:14:47.000000000 
+0200
@@ -278,7 +278,7 @@
                 -- connection: if the thread died for an unexpected exception,
                 -- inform the channel threads downstream accordingly. Otherwise
                 -- just use a normal 'killThread' finaliser.
-                let finaliser = case res of
+                let finaliser = ChanThreadKilledException $ case res of
                         Left ex -> ex
                         Right _ -> CE.toException CE.ThreadKilled
                 modifyMVar_ cChannels $ \x -> do
@@ -486,6 +486,16 @@
                     chanExceptionHandlers :: MVar [CE.SomeException -> IO ()]
                 }
 
+-- | Thrown in the channel thread when the connection gets closed.
+-- When handling exceptions in a subscription callback, make sure to re-throw 
this so the channel thread can be stopped.
+data ChanThreadKilledException = ChanThreadKilledException { cause :: 
CE.SomeException }
+  deriving (Show)
+
+instance CE.Exception ChanThreadKilledException
+
+unwrapChanThreadKilledException :: CE.SomeException -> CE.SomeException
+unwrapChanThreadKilledException e = maybe e cause $ CE.fromException e
+
 msgFromContentHeaderProperties :: ContentHeaderProperties -> BL.ByteString -> 
Message
 msgFromContentHeaderProperties (CHBasic content_type content_encoding headers 
delivery_mode priority correlation_id reply_to expiration message_id timestamp 
message_type user_id application_id cluster_id) body =
     let msgId = fromShortString message_id
@@ -542,8 +552,11 @@
                     let env = Envelope {envDeliveryTag = deliveryTag, 
envRedelivered = redelivered,
                                     envExchangeName = exchange, envRoutingKey 
= routingKey, envChannel = chan}
 
-                    CE.catch (subscriber (msg, env))
-                        (\(e::CE.SomeException) -> hPutStrLn stderr $ "AMQP 
callback threw exception: " ++ show e)
+                    CE.catches (subscriber (msg, env))
+                        [
+                          CE.Handler (\(e::ChanThreadKilledException) -> 
CE.throwIO $ cause e),
+                          CE.Handler (\(e::CE.SomeException) -> hPutStrLn 
stderr $ "AMQP callback threw exception: " ++ show e)
+                        ]
                 Nothing ->
                     -- got a message, but have no registered subscriber; so 
drop it
                     return ()
@@ -656,7 +669,7 @@
                    closeChannel' newChannel "closed"
                    case res of
                      Right _ -> return ()
-                     Left ex -> readMVar handlers >>= mapM_ ($ ex)
+                     Left ex -> readMVar handlers >>= mapM_ ($ 
unwrapChanThreadKilledException ex)
         when (IM.member newChannelID mp) $ CE.throwIO $ userError "openChannel 
fail: channel already open"
         return (IM.insert newChannelID (newChannel, thrID) mp, newChannel)
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amqp-0.14.1/Network/AMQP.hs 
new/amqp-0.15.1/Network/AMQP.hs
--- old/amqp-0.14.1/Network/AMQP.hs     2017-01-12 10:35:15.000000000 +0100
+++ new/amqp-0.15.1/Network/AMQP.hs     2017-06-30 14:14:47.000000000 +0200
@@ -119,7 +119,8 @@
     waitForConfirmsUntil,
     addConfirmationListener,
     ConfirmationResult(..),
-
+    AckType(..),
+    
     -- * Flow Control
     flow,
 
@@ -131,6 +132,7 @@
 
     -- * Exceptions
     AMQPException(..),
+    ChanThreadKilledException,
 
     -- * URI parsing
     fromURI
@@ -353,6 +355,8 @@
 
 -- | @consumeMsgs chan queue ack callback@ subscribes to the given queue and 
returns a consumerTag. For any incoming message, the callback will be run. If 
@ack == 'Ack'@ you will have to acknowledge all incoming messages (see 'ackMsg' 
and 'ackEnv')
 --
+-- If you do any exception handling inside the callback, you should make sure 
not to catch 'ChanThreadKilledException', or re-throw it if you did catch it, 
since it is used internally by the library to close channels.
+--
 -- NOTE: The callback will be run on the same thread as the channel thread 
(every channel spawns its own thread to listen for incoming data) so DO NOT 
perform any request on @chan@ inside the callback (however, you CAN perform 
requests on other open channels inside the callback, though I wouldn't 
recommend it).
 -- Functions that can safely be called on @chan@ are 'ackMsg', 'ackEnv', 
'rejectMsg', 'recoverMsgs'. If you want to perform anything more complex, it's 
a good idea to wrap it inside 'forkIO'.
 consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO 
ConsumerTag
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amqp-0.14.1/amqp.cabal new/amqp-0.15.1/amqp.cabal
--- old/amqp-0.14.1/amqp.cabal  2017-01-12 10:35:15.000000000 +0100
+++ new/amqp-0.15.1/amqp.cabal  2017-06-30 14:14:47.000000000 +0200
@@ -1,5 +1,5 @@
 Name:                amqp
-Version:             0.14.1
+Version:             0.15.1
 Synopsis:            Client library for AMQP servers (currently only RabbitMQ)
 Description:         Client library for AMQP servers (currently only RabbitMQ)
 License:             BSD3
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amqp-0.14.1/changelog.md new/amqp-0.15.1/changelog.md
--- old/amqp-0.14.1/changelog.md        2017-01-12 10:35:15.000000000 +0100
+++ new/amqp-0.15.1/changelog.md        2017-06-30 14:14:47.000000000 +0200
@@ -1,3 +1,11 @@
+### Version 0.15.1
+
+* export the `AckType` data-type and constructors
+
+### Version 0.15.0
+
+* The way channels are closed internally was changed. This may affect you if 
you have installed an exception handler inside the callback passed to 
`consumeMsgs`. Specifically, the exceptions used internally to close channels 
are now wrapped inside `ChanThreadKilledException`. You should make sure to 
re-throw this exception if you did catch it.
+
 ### Version 0.14.1
 
 * show all exceptions if no host can be connected to
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/amqp-0.14.1/test/BasicPublishSpec.hs 
new/amqp-0.15.1/test/BasicPublishSpec.hs
--- old/amqp-0.14.1/test/BasicPublishSpec.hs    2017-01-12 10:35:15.000000000 
+0100
+++ new/amqp-0.15.1/test/BasicPublishSpec.hs    2017-06-30 14:14:47.000000000 
+0200
@@ -6,8 +6,12 @@
 import Network.AMQP
 
 import Data.ByteString.Lazy.Char8 as BL
+import Data.Map (Map)
+import qualified Data.Map as Map
+import Data.Word
 
 import Control.Concurrent (threadDelay)
+import Control.Concurrent.STM
 
 main :: IO ()
 main = hspec spec
@@ -60,3 +64,66 @@
 
                 _ <- deleteQueue ch q
                 closeConnection conn
+        context "confirmSelect" $ do
+            it "receives a confirmation message" $ do
+                let q = "haskell-amqp.queues.publish-over-fanout1"
+                    e = "haskell-amqp.fanout.d.na"
+                conn <- openConnection "127.0.0.1" "/" "guest" "guest"
+                ch   <- openChannel conn
+                confirmSelect ch True
+                (confirmMap, counter) <- atomically $ (,) <$> newTVar 
Map.empty <*> newTVar 0
+                addConfirmationListener ch (handleConfirms counter confirmMap)
+                _    <- declareExchange ch (newExchange {exchangeName = e,
+                                                         exchangeType = 
"fanout",
+                                                         exchangeDurable = 
True})
+
+                (_, _, _) <- declareQueue ch (newQueue {queueName = q, 
queueDurable = False})
+                _ <- purgeQueue ch q
+                bindQueue ch q e ""
+
+
+                _ <- traverse (\n -> do
+                                  sn' <- publishMsg ch e "" (newMsg {msgBody = 
(BL.pack "hello")})
+                                  case sn' of
+                                    Just sn -> atomically $ addSequenceNumber 
confirmMap (fromIntegral sn) n
+                                    Nothing -> return ()
+
+                              ) [1..5]
+
+
+                threadDelay (1000 * 100)
+
+                (_, n, _) <- declareQueue ch (newQueue {queueName = q, 
queueDurable = False})
+                n `shouldBe` 5
+
+                cMap' <- atomically $ readTVar confirmMap
+                cMap' `shouldBe` Map.empty
+
+                counter' <- atomically $ readTVar counter
+                counter' `shouldBe` 5
+
+                _ <- deleteQueue ch q
+                closeConnection conn
+
+
+addSequenceNumber :: TVar (Map Word64 Int) -> Word64 -> Int -> STM ()
+addSequenceNumber cMap sn n = modifyTVar' cMap (Map.insert sn n)
+
+removeSequenceNumber :: TVar (Map Word64 Int) -> Word64 -> STM ()
+removeSequenceNumber cMap sn = modifyTVar' cMap (Map.delete sn)
+
+increaseCounter :: TVar Int -> STM ()
+increaseCounter n = modifyTVar' n (+1)
+
+handleConfirms :: TVar Int -> TVar (Map Word64 Int) -> (Word64, Bool, AckType) 
-> IO ()
+handleConfirms c _ (_, False, BasicNack) = atomically $ increaseCounter c
+handleConfirms c _ (_, True, BasicNack) = atomically $ increaseCounter c
+handleConfirms c cMap (n, False, BasicAck) = atomically $ removeSequenceNumber 
cMap n >> increaseCounter c
+handleConfirms c cMap (n, True, BasicAck) = atomically $ do
+  cMap' <- readTVar cMap
+  let (lt, eq', _) = Map.splitLookup n cMap'
+  case eq' of
+    Just _ -> removeSequenceNumber cMap n >> increaseCounter c
+    Nothing -> return ()
+  _ <- traverse (\i -> removeSequenceNumber cMap i >> increaseCounter c) 
(Map.keys lt)
+  return ()


Reply via email to