{-# LANGUAGE BangPatterns, DeriveDataTypeable, OverloadedStrings, ScopedTypeVariables, TupleSections #-}
module Network.AMQP (
Connection,
ConnectionOpts(..),
TLSSettings(..),
defaultConnectionOpts,
openConnection,
openConnection',
openConnection'',
closeChannel,
closeConnection,
addConnectionClosedHandler,
addConnectionBlockedHandler,
getServerProperties,
Channel,
openChannel,
addReturnListener,
addChannelExceptionHandler,
isNormalChannelClose,
qos,
ExchangeOpts(..),
newExchange,
declareExchange,
bindExchange,
bindExchange',
unbindExchange,
unbindExchange',
deleteExchange,
QueueOpts(..),
newQueue,
declareQueue,
bindQueue,
bindQueue',
unbindQueue,
unbindQueue',
purgeQueue,
deleteQueue,
Message(..),
DeliveryMode(..),
PublishError(..),
ReturnReplyCode(..),
newMsg,
Envelope(..),
ConsumerTag,
Ack(..),
consumeMsgs,
consumeMsgs',
cancelConsumer,
publishMsg,
publishMsg',
getMsg,
rejectMsg,
rejectEnv,
recoverMsgs,
ackMsg,
ackEnv,
nackMsg,
nackEnv,
txSelect,
txCommit,
txRollback,
confirmSelect,
waitForConfirms,
waitForConfirmsUntil,
addConfirmationListener,
ConfirmationResult(..),
AckType(..),
flow,
SASLMechanism(..),
plain,
amqplain,
rabbitCRdemo,
AMQPException(..),
ChanThreadKilledException,
CloseType(..),
fromURI
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad(when)
import Data.List.Split (splitOn)
import Data.Binary
import Data.Binary.Put
import Network.Socket (PortNumber)
import Network.URI (unEscapeString)
import Data.Text (Text)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString as BS
import qualified Data.IntSet as IntSet
import qualified Data.Text.Encoding as E
import qualified Data.Map as M
import qualified Data.Text as T
import Network.AMQP.Types
import Network.AMQP.Generated
import Network.AMQP.Internal
import Network.AMQP.Helpers
import Text.Read (readMaybe)
data ExchangeOpts = ExchangeOpts {
ExchangeOpts -> Text
exchangeName :: Text,
ExchangeOpts -> Text
exchangeType :: Text,
ExchangeOpts -> Bool
exchangePassive :: Bool,
ExchangeOpts -> Bool
exchangeDurable :: Bool,
ExchangeOpts -> Bool
exchangeAutoDelete :: Bool,
ExchangeOpts -> Bool
exchangeInternal :: Bool,
ExchangeOpts -> FieldTable
exchangeArguments :: FieldTable
} deriving (ExchangeOpts -> ExchangeOpts -> Bool
(ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool) -> Eq ExchangeOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ExchangeOpts -> ExchangeOpts -> Bool
== :: ExchangeOpts -> ExchangeOpts -> Bool
$c/= :: ExchangeOpts -> ExchangeOpts -> Bool
/= :: ExchangeOpts -> ExchangeOpts -> Bool
Eq, Eq ExchangeOpts
Eq ExchangeOpts =>
(ExchangeOpts -> ExchangeOpts -> Ordering)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> ExchangeOpts)
-> (ExchangeOpts -> ExchangeOpts -> ExchangeOpts)
-> Ord ExchangeOpts
ExchangeOpts -> ExchangeOpts -> Bool
ExchangeOpts -> ExchangeOpts -> Ordering
ExchangeOpts -> ExchangeOpts -> ExchangeOpts
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ExchangeOpts -> ExchangeOpts -> Ordering
compare :: ExchangeOpts -> ExchangeOpts -> Ordering
$c< :: ExchangeOpts -> ExchangeOpts -> Bool
< :: ExchangeOpts -> ExchangeOpts -> Bool
$c<= :: ExchangeOpts -> ExchangeOpts -> Bool
<= :: ExchangeOpts -> ExchangeOpts -> Bool
$c> :: ExchangeOpts -> ExchangeOpts -> Bool
> :: ExchangeOpts -> ExchangeOpts -> Bool
$c>= :: ExchangeOpts -> ExchangeOpts -> Bool
>= :: ExchangeOpts -> ExchangeOpts -> Bool
$cmax :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
max :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
$cmin :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
min :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
Ord, ReadPrec [ExchangeOpts]
ReadPrec ExchangeOpts
Int -> ReadS ExchangeOpts
ReadS [ExchangeOpts]
(Int -> ReadS ExchangeOpts)
-> ReadS [ExchangeOpts]
-> ReadPrec ExchangeOpts
-> ReadPrec [ExchangeOpts]
-> Read ExchangeOpts
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS ExchangeOpts
readsPrec :: Int -> ReadS ExchangeOpts
$creadList :: ReadS [ExchangeOpts]
readList :: ReadS [ExchangeOpts]
$creadPrec :: ReadPrec ExchangeOpts
readPrec :: ReadPrec ExchangeOpts
$creadListPrec :: ReadPrec [ExchangeOpts]
readListPrec :: ReadPrec [ExchangeOpts]
Read, Int -> ExchangeOpts -> ShowS
[ExchangeOpts] -> ShowS
ExchangeOpts -> String
(Int -> ExchangeOpts -> ShowS)
-> (ExchangeOpts -> String)
-> ([ExchangeOpts] -> ShowS)
-> Show ExchangeOpts
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ExchangeOpts -> ShowS
showsPrec :: Int -> ExchangeOpts -> ShowS
$cshow :: ExchangeOpts -> String
show :: ExchangeOpts -> String
$cshowList :: [ExchangeOpts] -> ShowS
showList :: [ExchangeOpts] -> ShowS
Show)
newExchange :: ExchangeOpts
newExchange :: ExchangeOpts
newExchange = Text
-> Text
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> ExchangeOpts
ExchangeOpts Text
"" Text
"" Bool
False Bool
True Bool
False Bool
False (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange Channel
chan ExchangeOpts
exchg = do
(SimpleMethod MethodPayload
Exchange_declare_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Exchange_declare
ShortInt
1
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ExchangeOpts -> Text
exchangeName ExchangeOpts
exchg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ExchangeOpts -> Text
exchangeType ExchangeOpts
exchg)
(ExchangeOpts -> Bool
exchangePassive ExchangeOpts
exchg)
(ExchangeOpts -> Bool
exchangeDurable ExchangeOpts
exchg)
(ExchangeOpts -> Bool
exchangeAutoDelete ExchangeOpts
exchg)
(ExchangeOpts -> Bool
exchangeInternal ExchangeOpts
exchg)
Bool
False
(ExchangeOpts -> FieldTable
exchangeArguments ExchangeOpts
exchg)))
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
bindExchange :: Channel -> Text -> Text -> Text -> IO ()
bindExchange :: Channel -> Text -> Text -> Text -> IO ()
bindExchange Channel
chan Text
destinationName Text
sourceName Text
routingKey =
Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey FieldTable
args = do
(SimpleMethod MethodPayload
Exchange_bind_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Exchange_bind
ShortInt
1
(Text -> ShortString
ShortString Text
destinationName)
(Text -> ShortString
ShortString Text
sourceName)
(Text -> ShortString
ShortString Text
routingKey)
Bool
False
FieldTable
args
))
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
unbindExchange Channel
chan Text
destinationName Text
sourceName Text
routingKey =
Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey FieldTable
args = do
SimpleMethod MethodPayload
Exchange_unbind_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Exchange_unbind
ShortInt
1
(Text -> ShortString
ShortString Text
destinationName)
(Text -> ShortString
ShortString Text
sourceName)
(Text -> ShortString
ShortString Text
routingKey)
Bool
False
FieldTable
args
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
deleteExchange :: Channel -> Text -> IO ()
deleteExchange :: Channel -> Text -> IO ()
deleteExchange Channel
chan Text
exchange = do
(SimpleMethod MethodPayload
Exchange_delete_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt -> ShortString -> Bool -> Bool -> MethodPayload
Exchange_delete
ShortInt
1
(Text -> ShortString
ShortString Text
exchange)
Bool
False
Bool
False
))
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data QueueOpts = QueueOpts {
QueueOpts -> Text
queueName :: Text,
QueueOpts -> Bool
queuePassive :: Bool,
QueueOpts -> Bool
queueDurable :: Bool,
QueueOpts -> Bool
queueExclusive :: Bool,
QueueOpts -> Bool
queueAutoDelete :: Bool,
:: FieldTable
} deriving (QueueOpts -> QueueOpts -> Bool
(QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool) -> Eq QueueOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueOpts -> QueueOpts -> Bool
== :: QueueOpts -> QueueOpts -> Bool
$c/= :: QueueOpts -> QueueOpts -> Bool
/= :: QueueOpts -> QueueOpts -> Bool
Eq, Eq QueueOpts
Eq QueueOpts =>
(QueueOpts -> QueueOpts -> Ordering)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> QueueOpts)
-> (QueueOpts -> QueueOpts -> QueueOpts)
-> Ord QueueOpts
QueueOpts -> QueueOpts -> Bool
QueueOpts -> QueueOpts -> Ordering
QueueOpts -> QueueOpts -> QueueOpts
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: QueueOpts -> QueueOpts -> Ordering
compare :: QueueOpts -> QueueOpts -> Ordering
$c< :: QueueOpts -> QueueOpts -> Bool
< :: QueueOpts -> QueueOpts -> Bool
$c<= :: QueueOpts -> QueueOpts -> Bool
<= :: QueueOpts -> QueueOpts -> Bool
$c> :: QueueOpts -> QueueOpts -> Bool
> :: QueueOpts -> QueueOpts -> Bool
$c>= :: QueueOpts -> QueueOpts -> Bool
>= :: QueueOpts -> QueueOpts -> Bool
$cmax :: QueueOpts -> QueueOpts -> QueueOpts
max :: QueueOpts -> QueueOpts -> QueueOpts
$cmin :: QueueOpts -> QueueOpts -> QueueOpts
min :: QueueOpts -> QueueOpts -> QueueOpts
Ord, ReadPrec [QueueOpts]
ReadPrec QueueOpts
Int -> ReadS QueueOpts
ReadS [QueueOpts]
(Int -> ReadS QueueOpts)
-> ReadS [QueueOpts]
-> ReadPrec QueueOpts
-> ReadPrec [QueueOpts]
-> Read QueueOpts
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS QueueOpts
readsPrec :: Int -> ReadS QueueOpts
$creadList :: ReadS [QueueOpts]
readList :: ReadS [QueueOpts]
$creadPrec :: ReadPrec QueueOpts
readPrec :: ReadPrec QueueOpts
$creadListPrec :: ReadPrec [QueueOpts]
readListPrec :: ReadPrec [QueueOpts]
Read, Int -> QueueOpts -> ShowS
[QueueOpts] -> ShowS
QueueOpts -> String
(Int -> QueueOpts -> ShowS)
-> (QueueOpts -> String)
-> ([QueueOpts] -> ShowS)
-> Show QueueOpts
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueOpts -> ShowS
showsPrec :: Int -> QueueOpts -> ShowS
$cshow :: QueueOpts -> String
show :: QueueOpts -> String
$cshowList :: [QueueOpts] -> ShowS
showList :: [QueueOpts] -> ShowS
Show)
newQueue :: QueueOpts
newQueue :: QueueOpts
newQueue = Text -> Bool -> Bool -> Bool -> Bool -> FieldTable -> QueueOpts
QueueOpts Text
"" Bool
False Bool
True Bool
False Bool
False (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue Channel
chan QueueOpts
queue = do
SimpleMethod (Queue_declare_ok (ShortString Text
qName) Word32
messageCount Word32
consumerCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Queue_declare
ShortInt
1
(Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ QueueOpts -> Text
queueName QueueOpts
queue)
(QueueOpts -> Bool
queuePassive QueueOpts
queue)
(QueueOpts -> Bool
queueDurable QueueOpts
queue)
(QueueOpts -> Bool
queueExclusive QueueOpts
queue)
(QueueOpts -> Bool
queueAutoDelete QueueOpts
queue)
Bool
False
(QueueOpts -> FieldTable
queueHeaders QueueOpts
queue)
(Text, Int, Int) -> IO (Text, Int, Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text
qName, Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
messageCount, Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
consumerCount)
bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue Channel
chan Text
queue Text
exchange Text
routingKey = Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' Channel
chan Text
queue Text
exchange Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' Channel
chan Text
queue Text
exchange Text
routingKey FieldTable
args = do
(SimpleMethod MethodPayload
Queue_bind_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Queue_bind
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
(Text -> ShortString
ShortString Text
exchange)
(Text -> ShortString
ShortString Text
routingKey)
Bool
False
FieldTable
args
))
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
unbindQueue Channel
chan Text
queue Text
exchange Text
routingKey =
Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' Channel
chan Text
queue Text
exchange Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' Channel
chan Text
queue Text
exchange Text
routingKey FieldTable
args = do
SimpleMethod MethodPayload
Queue_unbind_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> ShortString
-> FieldTable
-> MethodPayload
Queue_unbind
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
(Text -> ShortString
ShortString Text
exchange)
(Text -> ShortString
ShortString Text
routingKey)
FieldTable
args
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
purgeQueue :: Channel -> Text -> IO Word32
purgeQueue :: Channel -> Text -> IO Word32
purgeQueue Channel
chan Text
queue = do
SimpleMethod (Queue_purge_ok Word32
msgCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> MethodPayload
Queue_purge
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
Bool
False
Word32 -> IO Word32
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
msgCount
deleteQueue :: Channel -> Text -> IO Word32
deleteQueue :: Channel -> Text -> IO Word32
deleteQueue Channel
chan Text
queue = do
SimpleMethod (Queue_delete_ok Word32
msgCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> Bool -> Bool -> MethodPayload
Queue_delete
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
Bool
False
Bool
False
Bool
False
Word32 -> IO Word32
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
msgCount
newMsg :: Message
newMsg :: Message
newMsg = ByteString
-> Maybe DeliveryMode
-> Maybe Timestamp
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Octet
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
BL.empty Maybe DeliveryMode
forall a. Maybe a
Nothing Maybe Timestamp
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Octet
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe FieldTable
forall a. Maybe a
Nothing
data Ack = Ack | NoAck
deriving (Ack -> Ack -> Bool
(Ack -> Ack -> Bool) -> (Ack -> Ack -> Bool) -> Eq Ack
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Ack -> Ack -> Bool
== :: Ack -> Ack -> Bool
$c/= :: Ack -> Ack -> Bool
/= :: Ack -> Ack -> Bool
Eq, Eq Ack
Eq Ack =>
(Ack -> Ack -> Ordering)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Ack)
-> (Ack -> Ack -> Ack)
-> Ord Ack
Ack -> Ack -> Bool
Ack -> Ack -> Ordering
Ack -> Ack -> Ack
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Ack -> Ack -> Ordering
compare :: Ack -> Ack -> Ordering
$c< :: Ack -> Ack -> Bool
< :: Ack -> Ack -> Bool
$c<= :: Ack -> Ack -> Bool
<= :: Ack -> Ack -> Bool
$c> :: Ack -> Ack -> Bool
> :: Ack -> Ack -> Bool
$c>= :: Ack -> Ack -> Bool
>= :: Ack -> Ack -> Bool
$cmax :: Ack -> Ack -> Ack
max :: Ack -> Ack -> Ack
$cmin :: Ack -> Ack -> Ack
min :: Ack -> Ack -> Ack
Ord, ReadPrec [Ack]
ReadPrec Ack
Int -> ReadS Ack
ReadS [Ack]
(Int -> ReadS Ack)
-> ReadS [Ack] -> ReadPrec Ack -> ReadPrec [Ack] -> Read Ack
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Ack
readsPrec :: Int -> ReadS Ack
$creadList :: ReadS [Ack]
readList :: ReadS [Ack]
$creadPrec :: ReadPrec Ack
readPrec :: ReadPrec Ack
$creadListPrec :: ReadPrec [Ack]
readListPrec :: ReadPrec [Ack]
Read, Int -> Ack -> ShowS
[Ack] -> ShowS
Ack -> String
(Int -> Ack -> ShowS)
-> (Ack -> String) -> ([Ack] -> ShowS) -> Show Ack
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Ack -> ShowS
showsPrec :: Int -> Ack -> ShowS
$cshow :: Ack -> String
show :: Ack -> String
$cshowList :: [Ack] -> ShowS
showList :: [Ack] -> ShowS
Show)
ackToBool :: Ack -> Bool
ackToBool :: Ack -> Bool
ackToBool Ack
Ack = Bool
False
ackToBool Ack
NoAck = Bool
True
consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback =
Channel
-> Text
-> Ack
-> ((Message, Envelope) -> IO ())
-> (Text -> IO ())
-> FieldTable
-> IO Text
consumeMsgs' Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback (\Text
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)
consumeMsgs' :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> (ConsumerTag -> IO ()) -> FieldTable -> IO ConsumerTag
consumeMsgs' :: Channel
-> Text
-> Ack
-> ((Message, Envelope) -> IO ())
-> (Text -> IO ())
-> FieldTable
-> IO Text
consumeMsgs' Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback Text -> IO ()
cancelCB FieldTable
args = do
Text
newConsumerTag <- (Int -> Text) -> IO Int -> IO Text
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (String -> Text
T.pack (String -> Text) -> (Int -> String) -> Int -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String
forall a. Show a => a -> String
show) (IO Int -> IO Text) -> IO Int -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar Int -> (Int -> IO (Int, Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar Int
lastConsumerTag Channel
chan) ((Int -> IO (Int, Int)) -> IO Int)
-> (Int -> IO (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
c -> (Int, Int) -> IO (Int, Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1,Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) ((Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ())
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text
-> ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
newConsumerTag ((Message, Envelope) -> IO ()
callback, Text -> IO ()
cancelCB)
Channel -> Assembly -> IO ()
writeAssembly Channel
chan (MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Basic_consume
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
(Text -> ShortString
ShortString Text
newConsumerTag)
Bool
False
(Ack -> Bool
ackToBool Ack
ack)
Bool
False
Bool
True
FieldTable
args
)
Text -> IO Text
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Text
newConsumerTag
cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer :: Channel -> Text -> IO ()
cancelConsumer Channel
chan Text
consumerTag = do
SimpleMethod (Basic_cancel_ok ShortString
_) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortString -> Bool -> MethodPayload
Basic_cancel
(Text -> ShortString
ShortString Text
consumerTag)
Bool
False
MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) ((Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ())
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Map k a
M.delete Text
consumerTag
publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int)
publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int)
publishMsg Channel
chan Text
exchange Text
routingKey Message
msg = Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
chan Text
exchange Text
routingKey Bool
False Message
msg
publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
chan Text
exchange Text
routingKey Bool
mandatory Message
msg = do
MVar Int -> (Int -> IO (Int, Maybe Int)) -> IO (Maybe Int)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar Int
nextPublishSeqNum Channel
chan) ((Int -> IO (Int, Maybe Int)) -> IO (Maybe Int))
-> (Int -> IO (Int, Maybe Int)) -> IO (Maybe Int)
forall a b. (a -> b) -> a -> b
$ \Int
nxtSeqNum -> do
Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod
(ShortInt
-> ShortString -> ShortString -> Bool -> Bool -> MethodPayload
Basic_publish
ShortInt
1
(Text -> ShortString
ShortString Text
exchange)
(Text -> ShortString
ShortString Text
routingKey)
Bool
mandatory
Bool
False
)
(Maybe ShortString
-> Maybe ShortString
-> Maybe FieldTable
-> Maybe Octet
-> Maybe Octet
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe Timestamp
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> ContentHeaderProperties
CHBasic
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgContentType Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgContentEncoding Message
msg)
(Message -> Maybe FieldTable
msgHeaders Message
msg)
(DeliveryMode -> Octet
deliveryModeToInt (DeliveryMode -> Octet) -> Maybe DeliveryMode -> Maybe Octet
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe DeliveryMode
msgDeliveryMode Message
msg)
(Message -> Maybe Octet
msgPriority Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgCorrelationID Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgReplyTo Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgExpiration Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgID Message
msg)
(Message -> Maybe Timestamp
msgTimestamp Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgType Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgUserID Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgApplicationID Message
msg)
(Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgClusterID Message
msg)
)
(Message -> ByteString
msgBody Message
msg)
if Int
nxtSeqNum Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
then do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IntSet -> (IntSet -> IntSet) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Channel -> TVar IntSet
unconfirmedSet Channel
chan) ((IntSet -> IntSet) -> STM ()) -> (IntSet -> IntSet) -> STM ()
forall a b. (a -> b) -> a -> b
$ \IntSet
uSet -> Int -> IntSet -> IntSet
IntSet.insert Int
nxtSeqNum IntSet
uSet
(Int, Maybe Int) -> IO (Int, Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int
forall a. Enum a => a -> a
succ Int
nxtSeqNum, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
nxtSeqNum)
else (Int, Maybe Int) -> IO (Int, Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Maybe Int
forall a. Maybe a
Nothing)
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg Channel
chan Ack
ack Text
queue = do
Assembly
ret <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> MethodPayload
Basic_get
ShortInt
1
(Text -> ShortString
ShortString Text
queue)
(Ack -> Bool
ackToBool Ack
ack)
case Assembly
ret of
ContentMethod (Basic_get_ok Timestamp
deliveryTag Bool
redelivered (ShortString Text
exchange) (ShortString Text
routingKey) Word32
_) ContentHeaderProperties
properties ByteString
body ->
Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope)))
-> Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Maybe (Message, Envelope)
forall a. a -> Maybe a
Just (ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body,
Envelope {envDeliveryTag :: Timestamp
envDeliveryTag = Timestamp
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan})
Assembly
_ -> Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Message, Envelope)
forall a. Maybe a
Nothing
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
ackMsg :: Channel -> Timestamp -> Bool -> IO ()
ackMsg Channel
chan Timestamp
deliveryTag Bool
multiple =
Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> MethodPayload
Basic_ack
Timestamp
deliveryTag
Bool
multiple
ackEnv :: Envelope -> IO ()
ackEnv :: Envelope -> IO ()
ackEnv Envelope
env = Channel -> Timestamp -> Bool -> IO ()
ackMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
False
nackMsg :: Channel -> LongLongInt -> Bool -> Bool -> IO ()
nackMsg :: Channel -> Timestamp -> Bool -> Bool -> IO ()
nackMsg Channel
chan Timestamp
deliveryTag Bool
multiple Bool
requeue =
Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> Bool -> MethodPayload
Basic_nack
Timestamp
deliveryTag
Bool
multiple
Bool
requeue
nackEnv :: Envelope -> IO ()
nackEnv :: Envelope -> IO ()
nackEnv Envelope
env = Channel -> Timestamp -> Bool -> Bool -> IO ()
nackMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
False Bool
False
rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
rejectMsg :: Channel -> Timestamp -> Bool -> IO ()
rejectMsg Channel
chan Timestamp
deliveryTag Bool
requeue =
Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> MethodPayload
Basic_reject
Timestamp
deliveryTag
Bool
requeue
rejectEnv :: Envelope
-> Bool
-> IO ()
rejectEnv :: Envelope -> Bool -> IO ()
rejectEnv Envelope
env Bool
requeue = Channel -> Timestamp -> Bool -> IO ()
rejectMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
requeue
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs Channel
chan Bool
requeue = do
SimpleMethod MethodPayload
Basic_recover_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Bool -> MethodPayload
Basic_recover
Bool
requeue
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
txSelect :: Channel -> IO ()
txSelect :: Channel -> IO ()
txSelect Channel
chan = do
SimpleMethod MethodPayload
Tx_select_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_select
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
txCommit :: Channel -> IO ()
txCommit :: Channel -> IO ()
txCommit Channel
chan = do
SimpleMethod MethodPayload
Tx_commit_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_commit
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
txRollback :: Channel -> IO ()
txRollback :: Channel -> IO ()
txRollback Channel
chan = do
SimpleMethod MethodPayload
Tx_rollback_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_rollback
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
confirmSelect :: Channel -> Bool -> IO ()
confirmSelect :: Channel -> Bool -> IO ()
confirmSelect Channel
chan Bool
nowait = do
MVar Int -> (Int -> IO Int) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar Int
nextPublishSeqNum Channel
chan) ((Int -> IO Int) -> IO ()) -> (Int -> IO Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
seqn -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ if Int
seqn Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then Int
1 else Int
seqn
if Bool
nowait
then Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Confirm_select Bool
True)
else do
(SimpleMethod MethodPayload
Confirm_select_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Confirm_select Bool
False)
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
waitForConfirms :: Channel -> IO ConfirmationResult
waitForConfirms :: Channel -> IO ConfirmationResult
waitForConfirms Channel
chan = STM ConfirmationResult -> IO ConfirmationResult
forall a. STM a -> IO a
atomically (STM ConfirmationResult -> IO ConfirmationResult)
-> STM ConfirmationResult -> IO ConfirmationResult
forall a b. (a -> b) -> a -> b
$ (IntSet, IntSet) -> ConfirmationResult
Complete ((IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan
waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult
waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult
waitForConfirmsUntil Channel
chan Int
timeout = do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
timeout
let partial :: STM ConfirmationResult
partial = do
Bool
expired <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
if Bool
expired
then (IntSet, IntSet, IntSet) -> ConfirmationResult
Partial ((IntSet, IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((,,)
(IntSet -> IntSet -> IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> IntSet -> (IntSet, IntSet, IntSet))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
STM (IntSet -> IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> (IntSet, IntSet, IntSet))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty
STM (IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet, IntSet, IntSet)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan))
else STM ConfirmationResult
forall a. STM a
retry
complete :: STM ConfirmationResult
complete = (IntSet, IntSet) -> ConfirmationResult
Complete ((IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan
STM ConfirmationResult -> IO ConfirmationResult
forall a. STM a -> IO a
atomically (STM ConfirmationResult -> IO ConfirmationResult)
-> STM ConfirmationResult -> IO ConfirmationResult
forall a b. (a -> b) -> a -> b
$ STM ConfirmationResult
complete STM ConfirmationResult
-> STM ConfirmationResult -> STM ConfirmationResult
forall a. STM a -> STM a -> STM a
`orElse` STM ConfirmationResult
partial
addConfirmationListener :: Channel -> ((Word64, Bool, AckType) -> IO ()) -> IO ()
addConfirmationListener :: Channel -> ((Timestamp, Bool, AckType) -> IO ()) -> IO ()
addConfirmationListener Channel
chan (Timestamp, Bool, AckType) -> IO ()
handler =
MVar [(Timestamp, Bool, AckType) -> IO ()]
-> ([(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Timestamp, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) (([(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()])
-> IO ())
-> ([(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Timestamp, Bool, AckType) -> IO ()]
listeners -> [(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()])
-> [(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()]
forall a b. (a -> b) -> a -> b
$ (Timestamp, Bool, AckType) -> IO ()
handler((Timestamp, Bool, AckType) -> IO ())
-> [(Timestamp, Bool, AckType) -> IO ()]
-> [(Timestamp, Bool, AckType) -> IO ()]
forall a. a -> [a] -> [a]
:[(Timestamp, Bool, AckType) -> IO ()]
listeners
flow :: Channel -> Bool -> IO ()
flow :: Channel -> Bool -> IO ()
flow Channel
chan Bool
active = do
SimpleMethod (Channel_flow_ok Bool
_) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Channel_flow Bool
active)
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
defaultConnectionOpts :: ConnectionOpts
defaultConnectionOpts :: ConnectionOpts
defaultConnectionOpts = [(String, PortNumber)]
-> Text
-> [SASLMechanism]
-> Maybe Word32
-> Maybe ShortInt
-> Maybe ShortInt
-> Maybe TLSSettings
-> Maybe Text
-> ConnectionOpts
ConnectionOpts [(String
"localhost", PortNumber
5672)] Text
"/" [Text -> Text -> SASLMechanism
plain Text
"guest" Text
"guest"] (Word32 -> Maybe Word32
forall a. a -> Maybe a
Just Word32
131072) Maybe ShortInt
forall a. Maybe a
Nothing Maybe ShortInt
forall a. Maybe a
Nothing Maybe TLSSettings
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing
openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection String
host = String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' String
host PortNumber
5672
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' String
host PortNumber
port Text
vhost Text
loginName Text
loginPassword = ConnectionOpts -> IO Connection
openConnection'' (ConnectionOpts -> IO Connection)
-> ConnectionOpts -> IO Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts
defaultConnectionOpts {
coServers = [(host, port)],
coVHost = vhost,
coAuth = [plain loginName loginPassword]
}
plain :: Text -> Text -> SASLMechanism
plain :: Text -> Text -> SASLMechanism
plain Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"PLAIN" ByteString
initialResponse Maybe (ByteString -> IO ByteString)
forall a. Maybe a
Nothing
where
nul :: Char
nul = Char
'\0'
initialResponse :: ByteString
initialResponse = Text -> ByteString
E.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Char -> Text -> Text
T.cons Char
nul Text
loginName Text -> Text -> Text
`T.append` Char -> Text -> Text
T.cons Char
nul Text
loginPassword
amqplain :: Text -> Text -> SASLMechanism
amqplain :: Text -> Text -> SASLMechanism
amqplain Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"AMQPLAIN" ByteString
initialResponse Maybe (ByteString -> IO ByteString)
forall a. Maybe a
Nothing
where
initialResponse :: ByteString
initialResponse = ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Int64 -> ByteString -> ByteString
BL.drop Int64
4 (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ FieldTable -> Put
forall t. Binary t => t -> Put
put (FieldTable -> Put) -> FieldTable -> Put
forall a b. (a -> b) -> a -> b
$ Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
"LOGIN",ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
loginName), (Text
"PASSWORD", ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
loginPassword)]
rabbitCRdemo :: Text -> Text -> SASLMechanism
rabbitCRdemo :: Text -> Text -> SASLMechanism
rabbitCRdemo Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"RABBIT-CR-DEMO" ByteString
initialResponse ((ByteString -> IO ByteString)
-> Maybe (ByteString -> IO ByteString)
forall a. a -> Maybe a
Just ((ByteString -> IO ByteString)
-> Maybe (ByteString -> IO ByteString))
-> (ByteString -> IO ByteString)
-> Maybe (ByteString -> IO ByteString)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> ByteString -> IO ByteString
forall a b. a -> b -> a
const IO ByteString
challengeResponse)
where
initialResponse :: ByteString
initialResponse = Text -> ByteString
E.encodeUtf8 Text
loginName
challengeResponse :: IO ByteString
challengeResponse = ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
"My password is " ByteString -> ByteString -> ByteString
`BS.append` Text -> ByteString
E.encodeUtf8 Text
loginPassword
qos :: Channel -> Word32 -> Word16 -> Bool -> IO ()
qos :: Channel -> Word32 -> ShortInt -> Bool -> IO ()
qos Channel
chan Word32
prefetchSize ShortInt
prefetchCount Bool
global = do
SimpleMethod MethodPayload
Basic_qos_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Word32 -> ShortInt -> Bool -> MethodPayload
Basic_qos
Word32
prefetchSize
ShortInt
prefetchCount
Bool
global
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
fromURI :: String -> Either String ConnectionOpts
fromURI :: String -> Either String ConnectionOpts
fromURI String
uri =
case String
-> Either
String ([(String, PortNumber)], String, String, String, Bool)
fromURI' String
uri of
Right ([(String, PortNumber)]
hostPorts, String
uid, String
pw, String
vhost, Bool
tls) ->
ConnectionOpts -> Either String ConnectionOpts
forall a b. b -> Either a b
Right (ConnectionOpts -> Either String ConnectionOpts)
-> ConnectionOpts -> Either String ConnectionOpts
forall a b. (a -> b) -> a -> b
$ ConnectionOpts
defaultConnectionOpts {
coServers = hostPorts,
coVHost = T.pack vhost,
coAuth = [plain (T.pack uid) (T.pack pw)],
coTLSSettings = if tls then Just TLSTrusted else Nothing
}
Left String
err -> String -> Either String ConnectionOpts
forall a b. a -> Either a b
Left String
err
fromURI' :: String -> Either String ([(String, PortNumber)], String, String, String, Bool)
fromURI' :: String
-> Either
String ([(String, PortNumber)], String, String, String, Bool)
fromURI' String
uri =
case [Either String (String, PortNumber)]
-> Either String [(String, PortNumber)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ((String -> Either String (String, PortNumber))
-> [String] -> [Either String (String, PortNumber)]
forall a b. (a -> b) -> [a] -> [b]
map (PortNumber -> String -> Either String (String, PortNumber)
fromHostPort PortNumber
dport) [String]
hostPorts) of
Right [(String, PortNumber)]
hostPorts' -> ([(String, PortNumber)], String, String, String, Bool)
-> Either
String ([(String, PortNumber)], String, String, String, Bool)
forall a b. b -> Either a b
Right (
[(String, PortNumber)]
hostPorts',
ShowS
unEscapeString ((Char -> Bool) -> ShowS
forall a. (a -> Bool) -> [a] -> [a]
dropWhile (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
==Char
'/') String
uid),
ShowS
unEscapeString String
pw,
ShowS
unEscapeString String
vhost,
Bool
tls
)
Left String
err -> String
-> Either
String ([(String, PortNumber)], String, String, String, Bool)
forall a b. a -> Either a b
Left String
err
where (String
pre :String
suf : [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"@" (String
uri String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"@" )
(String
pro :String
uid' :String
pw':[String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
":" (String
pre String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"::")
(String
hnp :String
thost: [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"/" (String
suf String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" )
hostPorts :: [String]
hostPorts = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"," String
hnp
vhost :: String
vhost = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
thost then String
"/" else String
thost
dport :: PortNumber
dport = if String
pro String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"amqps" then PortNumber
5671 else PortNumber
5672
uid :: String
uid = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
uid' then String
"guest" else String
uid'
pw :: String
pw = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
pw' then String
"guest" else String
pw'
tls :: Bool
tls = String
pro String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"amqps"
fromHostPort :: PortNumber -> String -> Either String (String, PortNumber)
fromHostPort :: PortNumber -> String -> Either String (String, PortNumber)
fromHostPort PortNumber
defPort String
hostPort = (ShowS
unEscapeString String
host, ) (PortNumber -> (String, PortNumber))
-> Either String PortNumber -> Either String (String, PortNumber)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either String PortNumber
nport
where
(String
hst':String
port : [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
":" (String
hostPort String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
":" )
host :: String
host = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
hst' then String
"localhost" else String
hst'
nport :: Either String PortNumber
nport = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
port
then PortNumber -> Either String PortNumber
forall a b. b -> Either a b
Right PortNumber
defPort
else Either String PortNumber
-> (PortNumber -> Either String PortNumber)
-> Maybe PortNumber
-> Either String PortNumber
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> Either String PortNumber
forall a b. a -> Either a b
Left (String -> Either String PortNumber)
-> String -> Either String PortNumber
forall a b. (a -> b) -> a -> b
$ String
"invalid port number: "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
port) PortNumber -> Either String PortNumber
forall a b. b -> Either a b
Right (Maybe PortNumber -> Either String PortNumber)
-> Maybe PortNumber -> Either String PortNumber
forall a b. (a -> b) -> a -> b
$ String -> Maybe PortNumber
forall a. Read a => String -> Maybe a
readMaybe String
port