{-# LANGUAGE BangPatterns, DeriveDataTypeable, OverloadedStrings, ScopedTypeVariables #-}
-- |

--

-- A client library for AMQP servers implementing the 0-9-1 spec; currently only supports RabbitMQ (see <http://www.rabbitmq.com>)

--

-- A good introduction to RabbitMQ and AMQP 0-9-1 (in various languages): <http://www.rabbitmq.com/getstarted.html>, <http://www.rabbitmq.com/tutorials/amqp-concepts.html>

--

-- == Example

--

-- Connect to a server, declare a queue and an exchange and setup a callback for messages coming in on the queue. Then publish a single message to our new exchange

--

-- >{-# LANGUAGE OverloadedStrings #-}

-- >import Network.AMQP

-- >import qualified Data.ByteString.Lazy.Char8 as BL

-- >

-- >main = do

-- >    conn <- openConnection "127.0.0.1" "/" "guest" "guest"

-- >    chan <- openChannel conn

-- >

-- >    -- declare a queue, exchange and binding

-- >    declareQueue chan newQueue {queueName = "myQueue"}

-- >    declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}

-- >    bindQueue chan "myQueue" "myExchange" "myKey"

-- >

-- >    -- subscribe to the queue

-- >    consumeMsgs chan "myQueue" Ack myCallback

-- >

-- >    -- publish a message to our new exchange

-- >    publishMsg chan "myExchange" "myKey"

-- >        newMsg {msgBody = (BL.pack "hello world"),

-- >                msgDeliveryMode = Just Persistent}

-- >

-- >    getLine -- wait for keypress

-- >    closeConnection conn

-- >    putStrLn "connection closed"

-- >

-- >

-- >myCallback :: (Message,Envelope) -> IO ()

-- >myCallback (msg, env) = do

-- >    putStrLn $ "received message: " ++ (BL.unpack $ msgBody msg)

-- >    -- acknowledge receiving the message

-- >    ackEnv env

--

-- == Exception handling notes

--

-- Some function calls can make the AMQP server throw an AMQP exception, which has the side-effect of closing the connection or channel. The AMQP exceptions are raised as Haskell exceptions (see 'AMQPException'). So upon receiving an 'AMQPException' you may have to reopen the channel or connection.

--

-- == Debugging tips

--

-- If you need to debug a problem with e.g. channels being closed unexpectedly, here are some tips:

--

-- - The RabbitMQ log file often has helpful error-messages. The location of the log-file differs by OS. Look for RABBITMQ_LOGS in this page: <https://www.rabbitmq.com/relocate.html>

-- - The function 'addChannelExceptionHandler' can be used to figure out when and why a channel was closed.

-- - RabbitMQ has a browser-based management console, which allows you to see connections, channels, queues and more. Setup instructions are here: https://www.rabbitmq.com/management.html

--

module Network.AMQP (
    -- * Connection

    Connection,
    ConnectionOpts(..),
    TLSSettings(..),
    defaultConnectionOpts,
    openConnection,
    openConnection',
    openConnection'',
    closeChannel,
    closeConnection,
    addConnectionClosedHandler,
    addConnectionBlockedHandler,
    getServerProperties,

    -- * Channel

    Channel,
    openChannel,
    addReturnListener,
    addChannelExceptionHandler,
    isNormalChannelClose,
    qos,

    -- * Exchanges

    ExchangeOpts(..),
    newExchange,
    declareExchange,
    bindExchange,
    bindExchange',
    unbindExchange,
    unbindExchange',
    deleteExchange,

    -- * Queues

    QueueOpts(..),
    newQueue,
    declareQueue,
    bindQueue,
    bindQueue',
    unbindQueue,
    unbindQueue',
    purgeQueue,
    deleteQueue,

    -- * Messaging

    Message(..),
    DeliveryMode(..),
    PublishError(..),
    ReturnReplyCode(..),
    newMsg,
    Envelope(..),
    ConsumerTag,
    Ack(..),
    consumeMsgs,
    consumeMsgs',
    cancelConsumer,
    publishMsg,
    publishMsg',
    getMsg,
    rejectMsg,
    rejectEnv,
    recoverMsgs,

    ackMsg,
    ackEnv,
    nackMsg,
    nackEnv,

    -- * Transactions

    txSelect,
    txCommit,
    txRollback,

    -- * Confirms

    confirmSelect,
    waitForConfirms,
    waitForConfirmsUntil,
    addConfirmationListener,
    ConfirmationResult(..),
    AckType(..),

    -- * Flow Control

    flow,

    -- * SASL

    SASLMechanism(..),
    plain,
    amqplain,
    rabbitCRdemo,

    -- * Exceptions

    AMQPException(..),
    ChanThreadKilledException,
    CloseType(..),

    -- * URI parsing

    fromURI
) where

import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad(when)
import Data.List.Split (splitOn)
import Data.Binary
import Data.Binary.Put
import Network.Socket (PortNumber)
import Network.URI (unEscapeString)
import Data.Text (Text)

import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString as BS
import qualified Data.IntSet as IntSet
import qualified Data.Text.Encoding as E
import qualified Data.Map as M
import qualified Data.Text as T

import Network.AMQP.Types
import Network.AMQP.Generated
import Network.AMQP.Internal
import Network.AMQP.Helpers

----- EXCHANGE -----


-- | A record that contains the fields needed when creating a new exhange using 'declareExchange'. The default values apply when you use 'newExchange'.

data ExchangeOpts = ExchangeOpts {
    ExchangeOpts -> Text
exchangeName :: Text, -- ^ (must be set); the name of the exchange

    ExchangeOpts -> Text
exchangeType :: Text, -- ^ (must be set); the type of the exchange (\"fanout\", \"direct\", \"topic\", \"headers\")


    -- optional

    ExchangeOpts -> Bool
exchangePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.

    ExchangeOpts -> Bool
exchangeDurable :: Bool, -- ^ (default 'True'); If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

    ExchangeOpts -> Bool
exchangeAutoDelete :: Bool, -- ^ (default 'False'); If set, the exchange is deleted when all queues have finished using it.

    ExchangeOpts -> Bool
exchangeInternal :: Bool, -- ^ (default 'False'); If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

    ExchangeOpts -> FieldTable
exchangeArguments  :: FieldTable -- ^ (default empty); A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation.

} deriving (ExchangeOpts -> ExchangeOpts -> Bool
(ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool) -> Eq ExchangeOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ExchangeOpts -> ExchangeOpts -> Bool
== :: ExchangeOpts -> ExchangeOpts -> Bool
$c/= :: ExchangeOpts -> ExchangeOpts -> Bool
/= :: ExchangeOpts -> ExchangeOpts -> Bool
Eq, Eq ExchangeOpts
Eq ExchangeOpts =>
(ExchangeOpts -> ExchangeOpts -> Ordering)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> Bool)
-> (ExchangeOpts -> ExchangeOpts -> ExchangeOpts)
-> (ExchangeOpts -> ExchangeOpts -> ExchangeOpts)
-> Ord ExchangeOpts
ExchangeOpts -> ExchangeOpts -> Bool
ExchangeOpts -> ExchangeOpts -> Ordering
ExchangeOpts -> ExchangeOpts -> ExchangeOpts
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: ExchangeOpts -> ExchangeOpts -> Ordering
compare :: ExchangeOpts -> ExchangeOpts -> Ordering
$c< :: ExchangeOpts -> ExchangeOpts -> Bool
< :: ExchangeOpts -> ExchangeOpts -> Bool
$c<= :: ExchangeOpts -> ExchangeOpts -> Bool
<= :: ExchangeOpts -> ExchangeOpts -> Bool
$c> :: ExchangeOpts -> ExchangeOpts -> Bool
> :: ExchangeOpts -> ExchangeOpts -> Bool
$c>= :: ExchangeOpts -> ExchangeOpts -> Bool
>= :: ExchangeOpts -> ExchangeOpts -> Bool
$cmax :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
max :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
$cmin :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
min :: ExchangeOpts -> ExchangeOpts -> ExchangeOpts
Ord, ReadPrec [ExchangeOpts]
ReadPrec ExchangeOpts
Int -> ReadS ExchangeOpts
ReadS [ExchangeOpts]
(Int -> ReadS ExchangeOpts)
-> ReadS [ExchangeOpts]
-> ReadPrec ExchangeOpts
-> ReadPrec [ExchangeOpts]
-> Read ExchangeOpts
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS ExchangeOpts
readsPrec :: Int -> ReadS ExchangeOpts
$creadList :: ReadS [ExchangeOpts]
readList :: ReadS [ExchangeOpts]
$creadPrec :: ReadPrec ExchangeOpts
readPrec :: ReadPrec ExchangeOpts
$creadListPrec :: ReadPrec [ExchangeOpts]
readListPrec :: ReadPrec [ExchangeOpts]
Read, Int -> ExchangeOpts -> ShowS
[ExchangeOpts] -> ShowS
ExchangeOpts -> String
(Int -> ExchangeOpts -> ShowS)
-> (ExchangeOpts -> String)
-> ([ExchangeOpts] -> ShowS)
-> Show ExchangeOpts
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ExchangeOpts -> ShowS
showsPrec :: Int -> ExchangeOpts -> ShowS
$cshow :: ExchangeOpts -> String
show :: ExchangeOpts -> String
$cshowList :: [ExchangeOpts] -> ShowS
showList :: [ExchangeOpts] -> ShowS
Show)

-- | an 'ExchangeOpts' with defaults set; you must override at least the 'exchangeName' and 'exchangeType' fields.

newExchange :: ExchangeOpts
newExchange :: ExchangeOpts
newExchange = Text
-> Text
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> ExchangeOpts
ExchangeOpts Text
"" Text
"" Bool
False Bool
True Bool
False Bool
False (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | declares a new exchange on the AMQP server. Can be used like this: @declareExchange channel newExchange {exchangeName = \"myExchange\", exchangeType = \"fanout\"}@

declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange :: Channel -> ExchangeOpts -> IO ()
declareExchange Channel
chan ExchangeOpts
exchg = do
    (SimpleMethod MethodPayload
Exchange_declare_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Exchange_declare
        ShortInt
1 -- ticket; ignored by rabbitMQ

        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ExchangeOpts -> Text
exchangeName ExchangeOpts
exchg) -- exchange

        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ ExchangeOpts -> Text
exchangeType ExchangeOpts
exchg) -- typ

        (ExchangeOpts -> Bool
exchangePassive ExchangeOpts
exchg) -- passive

        (ExchangeOpts -> Bool
exchangeDurable ExchangeOpts
exchg) -- durable

        (ExchangeOpts -> Bool
exchangeAutoDelete ExchangeOpts
exchg)  -- auto_delete

        (ExchangeOpts -> Bool
exchangeInternal ExchangeOpts
exchg) -- internal

        Bool
False -- nowait

        (ExchangeOpts -> FieldTable
exchangeArguments ExchangeOpts
exchg))) -- arguments

    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | @bindExchange chan destinationName sourceName routingKey@ binds the exchange to the exchange using the provided routing key

bindExchange :: Channel -> Text -> Text -> Text -> IO ()
bindExchange :: Channel -> Text -> Text -> Text -> IO ()
bindExchange Channel
chan Text
destinationName Text
sourceName Text
routingKey =
    Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | an extended version of @bindExchange@ that allows you to include arbitrary arguments. This is useful to use the @headers@ exchange-type.

bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey FieldTable
args = do
    (SimpleMethod MethodPayload
Exchange_bind_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Exchange_bind
        ShortInt
1 -- ticket; ignored by rabbitMQ

        (Text -> ShortString
ShortString Text
destinationName)
        (Text -> ShortString
ShortString Text
sourceName)
        (Text -> ShortString
ShortString Text
routingKey)
        Bool
False -- nowait

        FieldTable
args -- arguments

        ))
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | @unbindExchange chan destinationName sourceName routingKey@ unbinds an exchange from an exchange. The @routingKey@ must be identical to the one specified when binding the exchange.

unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
unbindExchange :: Channel -> Text -> Text -> Text -> IO ()
unbindExchange Channel
chan Text
destinationName Text
sourceName Text
routingKey =
  Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | an extended version of @unbindExchange@ that allows you to include arguments. The @arguments@ must be identical to the ones specified when binding the exchange.

unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindExchange' Channel
chan Text
destinationName Text
sourceName Text
routingKey FieldTable
args = do
    SimpleMethod MethodPayload
Exchange_unbind_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Exchange_unbind
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
destinationName)
        (Text -> ShortString
ShortString Text
sourceName)
        (Text -> ShortString
ShortString Text
routingKey)
        Bool
False -- nowait

        FieldTable
args
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | deletes the exchange with the provided name

deleteExchange :: Channel -> Text -> IO ()
deleteExchange :: Channel -> Text -> IO ()
deleteExchange Channel
chan Text
exchange = do
    (SimpleMethod MethodPayload
Exchange_delete_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt -> ShortString -> Bool -> Bool -> MethodPayload
Exchange_delete
        ShortInt
1 -- ticket; ignored by rabbitMQ

        (Text -> ShortString
ShortString Text
exchange) -- exchange

        Bool
False -- if_unused;  If set, the server will only delete the exchange if it has no queue bindings.

        Bool
False -- nowait

        ))
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

----- QUEUE -----


-- | A record that contains the fields needed when creating a new queue using 'declareQueue'. The default values apply when you use 'newQueue'.

data QueueOpts = QueueOpts {
    --must be set

    QueueOpts -> Text
queueName :: Text, -- ^ (default \"\"); the name of the queue; if left empty, the server will generate a new name and return it from the 'declareQueue' method


    --optional

    QueueOpts -> Bool
queuePassive :: Bool, -- ^ (default 'False'); If set, the server will not create the queue.  The client can use this to check whether a queue exists without modifying the server state.

    QueueOpts -> Bool
queueDurable :: Bool, -- ^ (default 'True'); If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

    QueueOpts -> Bool
queueExclusive :: Bool, -- ^ (default 'False'); Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'.

    QueueOpts -> Bool
queueAutoDelete :: Bool, -- ^ (default 'False'); If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.

    QueueOpts -> FieldTable
queueHeaders :: FieldTable -- ^ (default empty): Headers to use when creating this queue, such as @x-message-ttl@ or @x-dead-letter-exchange@.

} deriving (QueueOpts -> QueueOpts -> Bool
(QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool) -> Eq QueueOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueOpts -> QueueOpts -> Bool
== :: QueueOpts -> QueueOpts -> Bool
$c/= :: QueueOpts -> QueueOpts -> Bool
/= :: QueueOpts -> QueueOpts -> Bool
Eq, Eq QueueOpts
Eq QueueOpts =>
(QueueOpts -> QueueOpts -> Ordering)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> Bool)
-> (QueueOpts -> QueueOpts -> QueueOpts)
-> (QueueOpts -> QueueOpts -> QueueOpts)
-> Ord QueueOpts
QueueOpts -> QueueOpts -> Bool
QueueOpts -> QueueOpts -> Ordering
QueueOpts -> QueueOpts -> QueueOpts
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: QueueOpts -> QueueOpts -> Ordering
compare :: QueueOpts -> QueueOpts -> Ordering
$c< :: QueueOpts -> QueueOpts -> Bool
< :: QueueOpts -> QueueOpts -> Bool
$c<= :: QueueOpts -> QueueOpts -> Bool
<= :: QueueOpts -> QueueOpts -> Bool
$c> :: QueueOpts -> QueueOpts -> Bool
> :: QueueOpts -> QueueOpts -> Bool
$c>= :: QueueOpts -> QueueOpts -> Bool
>= :: QueueOpts -> QueueOpts -> Bool
$cmax :: QueueOpts -> QueueOpts -> QueueOpts
max :: QueueOpts -> QueueOpts -> QueueOpts
$cmin :: QueueOpts -> QueueOpts -> QueueOpts
min :: QueueOpts -> QueueOpts -> QueueOpts
Ord, ReadPrec [QueueOpts]
ReadPrec QueueOpts
Int -> ReadS QueueOpts
ReadS [QueueOpts]
(Int -> ReadS QueueOpts)
-> ReadS [QueueOpts]
-> ReadPrec QueueOpts
-> ReadPrec [QueueOpts]
-> Read QueueOpts
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS QueueOpts
readsPrec :: Int -> ReadS QueueOpts
$creadList :: ReadS [QueueOpts]
readList :: ReadS [QueueOpts]
$creadPrec :: ReadPrec QueueOpts
readPrec :: ReadPrec QueueOpts
$creadListPrec :: ReadPrec [QueueOpts]
readListPrec :: ReadPrec [QueueOpts]
Read, Int -> QueueOpts -> ShowS
[QueueOpts] -> ShowS
QueueOpts -> String
(Int -> QueueOpts -> ShowS)
-> (QueueOpts -> String)
-> ([QueueOpts] -> ShowS)
-> Show QueueOpts
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueOpts -> ShowS
showsPrec :: Int -> QueueOpts -> ShowS
$cshow :: QueueOpts -> String
show :: QueueOpts -> String
$cshowList :: [QueueOpts] -> ShowS
showList :: [QueueOpts] -> ShowS
Show)

-- | a 'QueueOpts' with defaults set; you should override at least 'queueName'.

newQueue :: QueueOpts
newQueue :: QueueOpts
newQueue = Text -> Bool -> Bool -> Bool -> Bool -> FieldTable -> QueueOpts
QueueOpts Text
"" Bool
False Bool
True Bool
False Bool
False (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | creates a new queue on the AMQP server; can be used like this: @declareQueue channel newQueue {queueName = \"myQueue\"}@.

--

-- Returns a tuple @(queue, messageCount, consumerCount)@.

-- @queue@ is the name of the new queue (if you don't specify a queue the server will autogenerate one).

-- @messageCount@ is the number of messages in the queue, which will be zero for newly-created queues. @consumerCount@ is the number of active consumers for the queue.

declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue :: Channel -> QueueOpts -> IO (Text, Int, Int)
declareQueue Channel
chan QueueOpts
queue = do
    SimpleMethod (Queue_declare_ok (ShortString Text
qName) Word32
messageCount Word32
consumerCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Queue_declare
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString (Text -> ShortString) -> Text -> ShortString
forall a b. (a -> b) -> a -> b
$ QueueOpts -> Text
queueName QueueOpts
queue)
        (QueueOpts -> Bool
queuePassive QueueOpts
queue)
        (QueueOpts -> Bool
queueDurable QueueOpts
queue)
        (QueueOpts -> Bool
queueExclusive QueueOpts
queue)
        (QueueOpts -> Bool
queueAutoDelete QueueOpts
queue)
        Bool
False -- no-wait; true means no answer from server

        (QueueOpts -> FieldTable
queueHeaders QueueOpts
queue)
    (Text, Int, Int) -> IO (Text, Int, Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Text
qName, Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
messageCount, Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
consumerCount)

-- | @bindQueue chan queue exchange routingKey@ binds the queue to the exchange using the provided routing key. If @exchange@ is the empty string, the default exchange will be used.

bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue :: Channel -> Text -> Text -> Text -> IO ()
bindQueue Channel
chan Text
queue Text
exchange Text
routingKey = Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' Channel
chan Text
queue Text
exchange Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | an extended version of @bindQueue@ that allows you to include arbitrary arguments. This is useful to use the @headers@ exchange-type.

bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
bindQueue' Channel
chan Text
queue Text
exchange Text
routingKey FieldTable
args = do
    (SimpleMethod MethodPayload
Queue_bind_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (MethodPayload -> Assembly
SimpleMethod (ShortInt
-> ShortString
-> ShortString
-> ShortString
-> Bool
-> FieldTable
-> MethodPayload
Queue_bind
        ShortInt
1 -- ticket; ignored by rabbitMQ

        (Text -> ShortString
ShortString Text
queue)
        (Text -> ShortString
ShortString Text
exchange)
        (Text -> ShortString
ShortString Text
routingKey)
        Bool
False -- nowait

        FieldTable
args -- arguments

        ))
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | @unbindQueue chan queue exchange routingKey@ unbinds a queue from an exchange. The @routingKey@ must be identical to the one specified when binding the queue.

unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
unbindQueue :: Channel -> Text -> Text -> Text -> IO ()
unbindQueue Channel
chan Text
queue Text
exchange Text
routingKey =
  Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' Channel
chan Text
queue Text
exchange Text
routingKey (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | an extended version of @unbindQueue@ that allows you to include arguments. The @arguments@ must be identical to the ones specified when binding the queue.

unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' :: Channel -> Text -> Text -> Text -> FieldTable -> IO ()
unbindQueue' Channel
chan Text
queue Text
exchange Text
routingKey FieldTable
args = do
    SimpleMethod MethodPayload
Queue_unbind_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> ShortString
-> FieldTable
-> MethodPayload
Queue_unbind
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
queue)
        (Text -> ShortString
ShortString Text
exchange)
        (Text -> ShortString
ShortString Text
routingKey)
        FieldTable
args
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | remove all messages from the queue; returns the number of messages that were in the queue

purgeQueue :: Channel -> Text -> IO Word32
purgeQueue :: Channel -> Text -> IO Word32
purgeQueue Channel
chan Text
queue = do
    SimpleMethod (Queue_purge_ok Word32
msgCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> MethodPayload
Queue_purge
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
queue) -- queue

        Bool
False -- nowait

    Word32 -> IO Word32
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
msgCount

-- | deletes the queue; returns the number of messages that were in the queue before deletion

deleteQueue :: Channel -> Text -> IO Word32
deleteQueue :: Channel -> Text -> IO Word32
deleteQueue Channel
chan Text
queue = do
    SimpleMethod (Queue_delete_ok Word32
msgCount) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> Bool -> Bool -> MethodPayload
Queue_delete
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
queue) -- queue

        Bool
False -- if_unused

        Bool
False -- if_empty

        Bool
False -- nowait

    Word32 -> IO Word32
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
msgCount

----- MSG (the BASIC class in AMQP) -----


-- | a 'Msg' with defaults set; you should override at least 'msgBody'

newMsg :: Message
newMsg :: Message
newMsg = ByteString
-> Maybe DeliveryMode
-> Maybe Timestamp
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Text
-> Maybe Octet
-> Maybe Text
-> Maybe Text
-> Maybe FieldTable
-> Message
Message ByteString
BL.empty Maybe DeliveryMode
forall a. Maybe a
Nothing Maybe Timestamp
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Octet
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing Maybe FieldTable
forall a. Maybe a
Nothing

-- | specifies whether you have to acknowledge messages that you receive from 'consumeMsgs' or 'getMsg'. If you use 'Ack', you have to call 'ackMsg' or 'ackEnv' after you have processed a message, otherwise it might be delivered again in the future

data Ack = Ack | NoAck
  deriving (Ack -> Ack -> Bool
(Ack -> Ack -> Bool) -> (Ack -> Ack -> Bool) -> Eq Ack
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: Ack -> Ack -> Bool
== :: Ack -> Ack -> Bool
$c/= :: Ack -> Ack -> Bool
/= :: Ack -> Ack -> Bool
Eq, Eq Ack
Eq Ack =>
(Ack -> Ack -> Ordering)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Bool)
-> (Ack -> Ack -> Ack)
-> (Ack -> Ack -> Ack)
-> Ord Ack
Ack -> Ack -> Bool
Ack -> Ack -> Ordering
Ack -> Ack -> Ack
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: Ack -> Ack -> Ordering
compare :: Ack -> Ack -> Ordering
$c< :: Ack -> Ack -> Bool
< :: Ack -> Ack -> Bool
$c<= :: Ack -> Ack -> Bool
<= :: Ack -> Ack -> Bool
$c> :: Ack -> Ack -> Bool
> :: Ack -> Ack -> Bool
$c>= :: Ack -> Ack -> Bool
>= :: Ack -> Ack -> Bool
$cmax :: Ack -> Ack -> Ack
max :: Ack -> Ack -> Ack
$cmin :: Ack -> Ack -> Ack
min :: Ack -> Ack -> Ack
Ord, ReadPrec [Ack]
ReadPrec Ack
Int -> ReadS Ack
ReadS [Ack]
(Int -> ReadS Ack)
-> ReadS [Ack] -> ReadPrec Ack -> ReadPrec [Ack] -> Read Ack
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS Ack
readsPrec :: Int -> ReadS Ack
$creadList :: ReadS [Ack]
readList :: ReadS [Ack]
$creadPrec :: ReadPrec Ack
readPrec :: ReadPrec Ack
$creadListPrec :: ReadPrec [Ack]
readListPrec :: ReadPrec [Ack]
Read, Int -> Ack -> ShowS
[Ack] -> ShowS
Ack -> String
(Int -> Ack -> ShowS)
-> (Ack -> String) -> ([Ack] -> ShowS) -> Show Ack
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Ack -> ShowS
showsPrec :: Int -> Ack -> ShowS
$cshow :: Ack -> String
show :: Ack -> String
$cshowList :: [Ack] -> ShowS
showList :: [Ack] -> ShowS
Show)

ackToBool :: Ack -> Bool
ackToBool :: Ack -> Bool
ackToBool Ack
Ack = Bool
False
ackToBool Ack
NoAck = Bool
True

-- | @consumeMsgs chan queue ack callback@ subscribes to the given queue and returns a consumerTag. For any incoming message, the callback will be run. If @ack == 'Ack'@ you will have to acknowledge all incoming messages (see 'ackMsg' and 'ackEnv')

--

-- === Exceptions in the callback

--

-- If an exception occurs in @callback@, it will be caught and printed to @stderr@. But you should not depend on this behaviour (it might change in future versions of this library);

--  instead, it is /strongly/ recommended that you catch any exceptions that your callback may throw and handle them appropriately.

-- But make sure not to catch 'ChanThreadKilledException' (or re-throw it if you did catch it), since it is used internally by the library to close channels.

--

-- So unless you are confident that your callback won't throw exceptions, you may want to structure your code like this:

--

-- > consumeMsgs chan name Ack $ \(msg, env) -> (do ...)

-- >        `CE.catches`

-- >    [

-- >        -- rethrow this exception, since the AMPQ library uses it internally

-- >        CE.Handler $ \(e::ChanThreadKilledException) -> CE.throwIO e,

-- >

-- >        -- (optional) catch individual exceptions that your code may throw

-- >        CE.Handler $ \(e::CE.IOException) -> ...,

-- >        CE.Handler $ \(e::SomeOtherException) -> ...,

-- >

-- >        -- catch all exceptions that weren't handled above

-- >        CE.Handler $ \(e::CE.SomeException) -> ...

-- >    ]

--

-- In practice, it might be useful to encapsulate this exception-handling logic in a custom wrapper-function so that you can reuse it for every callback you pass to @consumeMsgs@.

--

-- === Blocking requests in the callback

--

-- The @callback@ will be run on the channel's receiver thread (which is responsible for handling all incoming messages on this channel, including responses to requests from the client) so DO NOT perform any blocking request on @chan@ inside the callback, as this would lead to a dead-lock. However, you CAN perform requests on other open channels inside the callback, though that would keep @chan@ blocked until the requests are done, so it is not recommended.

--

-- Unless you're using AMQP flow control, the following functions can safely be called on @chan@: 'ackMsg', 'ackEnv', 'rejectMsg', 'publishMsg'. If you use flow-control or want to perform anything more complex, it's recommended that instead of using 'consumeMsgs' you use 'getMsg' to fetch messages in a loop (because then your message-handling code will not run in the channel's receiver thread, so there will be no problems when performing blocking requests).

consumeMsgs :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> IO ConsumerTag
consumeMsgs :: Channel -> Text -> Ack -> ((Message, Envelope) -> IO ()) -> IO Text
consumeMsgs Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback =
    Channel
-> Text
-> Ack
-> ((Message, Envelope) -> IO ())
-> (Text -> IO ())
-> FieldTable
-> IO Text
consumeMsgs' Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback (\Text
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Map Text FieldValue -> FieldTable
FieldTable Map Text FieldValue
forall k a. Map k a
M.empty)

-- | an extended version of @consumeMsgs@ that allows you to define a consumer cancellation callback and include arbitrary arguments.

consumeMsgs' :: Channel -> Text -> Ack -> ((Message,Envelope) -> IO ()) -> (ConsumerTag -> IO ()) -> FieldTable -> IO ConsumerTag
consumeMsgs' :: Channel
-> Text
-> Ack
-> ((Message, Envelope) -> IO ())
-> (Text -> IO ())
-> FieldTable
-> IO Text
consumeMsgs' Channel
chan Text
queue Ack
ack (Message, Envelope) -> IO ()
callback Text -> IO ()
cancelCB FieldTable
args = do
    --generate a new consumer tag

    Text
newConsumerTag <- (Int -> Text) -> IO Int -> IO Text
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (String -> Text
T.pack (String -> Text) -> (Int -> String) -> Int -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> String
forall a. Show a => a -> String
show) (IO Int -> IO Text) -> IO Int -> IO Text
forall a b. (a -> b) -> a -> b
$ MVar Int -> (Int -> IO (Int, Int)) -> IO Int
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar Int
lastConsumerTag Channel
chan) ((Int -> IO (Int, Int)) -> IO Int)
-> (Int -> IO (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
c -> (Int, Int) -> IO (Int, Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1,Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)

    --register the consumer

    MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) ((Map Text ((Message, Envelope) -> IO (), Text -> IO ())
  -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
 -> IO ())
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
 -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text
-> ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Text
newConsumerTag ((Message, Envelope) -> IO ()
callback, Text -> IO ()
cancelCB)

    Channel -> Assembly -> IO ()
writeAssembly Channel
chan (MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt
-> ShortString
-> ShortString
-> Bool
-> Bool
-> Bool
-> Bool
-> FieldTable
-> MethodPayload
Basic_consume
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
queue) -- queue

        (Text -> ShortString
ShortString Text
newConsumerTag) -- consumer_tag

        Bool
False -- no_local; If the no-local field is set the server will not send messages to the client that published them.

        (Ack -> Bool
ackToBool Ack
ack) -- no_ack

        Bool
False -- exclusive; Request exclusive consumer access, meaning only this consumer can access the queue.

        Bool
True -- nowait

        FieldTable
args
        )
    Text -> IO Text
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Text
newConsumerTag

-- | stops a consumer that was started with 'consumeMsgs'

cancelConsumer :: Channel -> ConsumerTag -> IO ()
cancelConsumer :: Channel -> Text -> IO ()
cancelConsumer Channel
chan Text
consumerTag = do
    SimpleMethod (Basic_cancel_ok ShortString
_) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortString -> Bool -> MethodPayload
Basic_cancel
        (Text -> ShortString
ShortString Text
consumerTag) -- consumer_tag

        Bool
False -- nowait


    --unregister the consumer

    MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel
-> MVar (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
consumers Channel
chan) ((Map Text ((Message, Envelope) -> IO (), Text -> IO ())
  -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
 -> IO ())
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> IO ()
forall a b. (a -> b) -> a -> b
$ Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
 -> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ())))
-> (Map Text ((Message, Envelope) -> IO (), Text -> IO ())
    -> Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> IO (Map Text ((Message, Envelope) -> IO (), Text -> IO ()))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
-> Map Text ((Message, Envelope) -> IO (), Text -> IO ())
forall k a. Ord k => k -> Map k a -> Map k a
M.delete Text
consumerTag

-- | @publishMsg chan exchange routingKey msg@ publishes @msg@ to the exchange with the provided @exchange@. The effect of @routingKey@ depends on the type of the exchange.

--

-- Returns the sequence-number of the message (only if the channel is in publisher confirm mode; see 'confirmSelect').

--

-- NOTE: This method may temporarily block if the AMQP server requested us to stop sending content data (using the flow control mechanism). So don't rely on this method returning immediately.

publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int)
publishMsg :: Channel -> Text -> Text -> Message -> IO (Maybe Int)
publishMsg Channel
chan Text
exchange Text
routingKey Message
msg = Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
chan Text
exchange Text
routingKey Bool
False Message
msg

-- | Like 'publishMsg', but additionally allows you to specify whether the 'mandatory' flag should be set.

publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' :: Channel -> Text -> Text -> Bool -> Message -> IO (Maybe Int)
publishMsg' Channel
chan Text
exchange Text
routingKey Bool
mandatory Message
msg = do
    MVar Int -> (Int -> IO (Int, Maybe Int)) -> IO (Maybe Int)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Channel -> MVar Int
nextPublishSeqNum Channel
chan) ((Int -> IO (Int, Maybe Int)) -> IO (Maybe Int))
-> (Int -> IO (Int, Maybe Int)) -> IO (Maybe Int)
forall a b. (a -> b) -> a -> b
$ \Int
nxtSeqNum -> do
        Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> ContentHeaderProperties -> ByteString -> Assembly
ContentMethod
            (ShortInt
-> ShortString -> ShortString -> Bool -> Bool -> MethodPayload
Basic_publish
                ShortInt
1 -- ticket; ignored by rabbitMQ

                (Text -> ShortString
ShortString Text
exchange)
                (Text -> ShortString
ShortString Text
routingKey)
                Bool
mandatory -- mandatory; if true, the server might return the msg, which is currently not handled

                Bool
False --immediate; not customizable, as it is currently not supported anymore by RabbitMQ

            )
            (Maybe ShortString
-> Maybe ShortString
-> Maybe FieldTable
-> Maybe Octet
-> Maybe Octet
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe Timestamp
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> Maybe ShortString
-> ContentHeaderProperties
CHBasic
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgContentType Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgContentEncoding Message
msg)
                (Message -> Maybe FieldTable
msgHeaders Message
msg)
                (DeliveryMode -> Octet
deliveryModeToInt (DeliveryMode -> Octet) -> Maybe DeliveryMode -> Maybe Octet
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe DeliveryMode
msgDeliveryMode Message
msg)
                (Message -> Maybe Octet
msgPriority Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgCorrelationID Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgReplyTo Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgExpiration Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgID Message
msg)
                (Message -> Maybe Timestamp
msgTimestamp Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgType Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgUserID Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgApplicationID Message
msg)
                (Text -> ShortString
ShortString (Text -> ShortString) -> Maybe Text -> Maybe ShortString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Message -> Maybe Text
msgClusterID Message
msg)
            )
            (Message -> ByteString
msgBody Message
msg)

        if Int
nxtSeqNum Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
            then do
                STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar IntSet -> (IntSet -> IntSet) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Channel -> TVar IntSet
unconfirmedSet Channel
chan) ((IntSet -> IntSet) -> STM ()) -> (IntSet -> IntSet) -> STM ()
forall a b. (a -> b) -> a -> b
$ \IntSet
uSet -> Int -> IntSet -> IntSet
IntSet.insert Int
nxtSeqNum IntSet
uSet
                (Int, Maybe Int) -> IO (Int, Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int
forall a. Enum a => a -> a
succ Int
nxtSeqNum, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
nxtSeqNum)
           else (Int, Maybe Int) -> IO (Int, Maybe Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Maybe Int
forall a. Maybe a
Nothing)

-- | @getMsg chan ack queue@ gets a message from the specified queue. If @ack=='Ack'@, you have to call 'ackMsg' or 'ackEnv' for any message that you get, otherwise it might be delivered again in the future (by calling 'recoverMsgs').

--

-- Will return @Nothing@ when no message is currently available.

getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg :: Channel -> Ack -> Text -> IO (Maybe (Message, Envelope))
getMsg Channel
chan Ack
ack Text
queue = do
    Assembly
ret <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ ShortInt -> ShortString -> Bool -> MethodPayload
Basic_get
        ShortInt
1 -- ticket

        (Text -> ShortString
ShortString Text
queue) -- queue

        (Ack -> Bool
ackToBool Ack
ack) -- no_ack

    case Assembly
ret of
        ContentMethod (Basic_get_ok Timestamp
deliveryTag Bool
redelivered (ShortString Text
exchange) (ShortString Text
routingKey) Word32
_) ContentHeaderProperties
properties ByteString
body ->
            Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope)))
-> Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a b. (a -> b) -> a -> b
$ (Message, Envelope) -> Maybe (Message, Envelope)
forall a. a -> Maybe a
Just (ContentHeaderProperties -> ByteString -> Message
msgFromContentHeaderProperties ContentHeaderProperties
properties ByteString
body,
                           Envelope {envDeliveryTag :: Timestamp
envDeliveryTag = Timestamp
deliveryTag, envRedelivered :: Bool
envRedelivered = Bool
redelivered,
                                     envExchangeName :: Text
envExchangeName = Text
exchange, envRoutingKey :: Text
envRoutingKey = Text
routingKey, envChannel :: Channel
envChannel = Channel
chan})
        Assembly
_ -> Maybe (Message, Envelope) -> IO (Maybe (Message, Envelope))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Message, Envelope)
forall a. Maybe a
Nothing

{- | @ackMsg chan deliveryTag multiple@ acknowledges one or more messages. A message MUST not be acknowledged more than once.

if @multiple==True@, the @deliverTag@ is treated as \"up to and including\", so that the client can acknowledge multiple messages with a single method call. If @multiple==False@, @deliveryTag@ refers to a single message.

If @multiple==True@, and @deliveryTag==0@, tells the server to acknowledge all outstanding mesages.
-}
ackMsg :: Channel -> LongLongInt -> Bool -> IO ()
ackMsg :: Channel -> Timestamp -> Bool -> IO ()
ackMsg Channel
chan Timestamp
deliveryTag Bool
multiple =
    Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> MethodPayload
Basic_ack
        Timestamp
deliveryTag -- delivery_tag

        Bool
multiple -- multiple


-- | Acknowledges a single message. This is a wrapper for 'ackMsg' in case you have the 'Envelope' at hand.

ackEnv :: Envelope -> IO ()
ackEnv :: Envelope -> IO ()
ackEnv Envelope
env = Channel -> Timestamp -> Bool -> IO ()
ackMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
False

{- | @nackMsg chan deliveryTag multiple requeue@ rejects one or more messages. A message MUST not be rejected more than once.

if @multiple==True@, the @deliverTag@ is treated as \"up to and including\", so that the client can reject multiple messages with a single method call. If @multiple==False@, @deliveryTag@ refers to a single message.

If @requeue==True@, the server will try to requeue the message. If @requeue==False@, the message will be dropped by the server.
-}
nackMsg :: Channel -> LongLongInt -> Bool -> Bool -> IO ()
nackMsg :: Channel -> Timestamp -> Bool -> Bool -> IO ()
nackMsg Channel
chan Timestamp
deliveryTag Bool
multiple Bool
requeue =
    Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> Bool -> MethodPayload
Basic_nack
        Timestamp
deliveryTag -- delivery_tag

        Bool
multiple -- multiple

        Bool
requeue -- requeue


-- | Reject a single message. This is a wrapper for 'nackMsg' in case you have the 'Envelope' at hand.

nackEnv :: Envelope -> IO ()
nackEnv :: Envelope -> IO ()
nackEnv Envelope
env = Channel -> Timestamp -> Bool -> Bool -> IO ()
nackMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
False Bool
False

-- | @rejectMsg chan deliveryTag requeue@ allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable  messages to their original queue. If @requeue==False@, the message will be discarded.  If it is 'True', the server will attempt to requeue the message.

--

-- NOTE: RabbitMQ 1.7 doesn't implement this command

rejectMsg :: Channel -> LongLongInt -> Bool -> IO ()
rejectMsg :: Channel -> Timestamp -> Bool -> IO ()
rejectMsg Channel
chan Timestamp
deliveryTag Bool
requeue =
    Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Timestamp -> Bool -> MethodPayload
Basic_reject
        Timestamp
deliveryTag -- delivery_tag

        Bool
requeue -- requeue


-- | Reject a message. This is a wrapper for 'rejectMsg' in case you have the 'Envelope' at hand.

rejectEnv :: Envelope
          -> Bool -- ^ requeue

          -> IO ()
rejectEnv :: Envelope -> Bool -> IO ()
rejectEnv Envelope
env Bool
requeue = Channel -> Timestamp -> Bool -> IO ()
rejectMsg (Envelope -> Channel
envChannel Envelope
env) (Envelope -> Timestamp
envDeliveryTag Envelope
env) Bool
requeue

-- | @recoverMsgs chan requeue@ asks the broker to redeliver all messages that were received but not acknowledged on the specified channel.

--If @requeue==False@, the message will be redelivered to the original recipient. If @requeue==True@, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs :: Channel -> Bool -> IO ()
recoverMsgs Channel
chan Bool
requeue = do
    SimpleMethod MethodPayload
Basic_recover_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Bool -> MethodPayload
Basic_recover
        Bool
requeue -- requeue

    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

------------------- TRANSACTIONS (TX) --------------------------


-- | This method sets the channel to use standard transactions.  The client must use this method at least once on a channel before using the Commit or Rollback methods.

txSelect :: Channel -> IO ()
txSelect :: Channel -> IO ()
txSelect Channel
chan = do
    SimpleMethod MethodPayload
Tx_select_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_select
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | This method commits all messages published and acknowledged in the current transaction.  A new transaction starts immediately after a commit.

txCommit :: Channel -> IO ()
txCommit :: Channel -> IO ()
txCommit Channel
chan = do
    SimpleMethod MethodPayload
Tx_commit_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_commit
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

txRollback :: Channel -> IO ()
txRollback :: Channel -> IO ()
txRollback Channel
chan = do
    SimpleMethod MethodPayload
Tx_rollback_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod MethodPayload
Tx_rollback
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-------------------- PUBLISHER CONFIRMS ------------------------


{- | @confirmSelect chan nowait@ puts the channel in publisher confirm mode. This mode is a RabbitMQ
    extension where a producer receives confirmations when messages are successfully processed by
    the broker. Publisher confirms are a relatively lightweight alternative to full transactional
    mode. For details about the delivery guarantees and performace implications of this mode, see
    https://www.rabbitmq.com/confirms.html. Note that on a single channel, publisher confirms and
    transactions are mutually exclusive (you cannot select both at the same time).
    When @nowait==True@ the server will not send back a response to this method.
-}
confirmSelect :: Channel -> Bool -> IO ()
confirmSelect :: Channel -> Bool -> IO ()
confirmSelect Channel
chan Bool
nowait = do
    MVar Int -> (Int -> IO Int) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar Int
nextPublishSeqNum Channel
chan) ((Int -> IO Int) -> IO ()) -> (Int -> IO Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
seqn -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ if Int
seqn Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 then Int
1 else Int
seqn
    if Bool
nowait
        then Channel -> Assembly -> IO ()
writeAssembly Channel
chan (Assembly -> IO ()) -> Assembly -> IO ()
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Confirm_select Bool
True)
        else do
            (SimpleMethod MethodPayload
Confirm_select_ok) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Confirm_select Bool
False)
            () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

{- | Calling this function will cause the invoking thread to block until all previously published
messages have been acknowledged by the broker (positively or negatively). Returns a value of type
'ConfirmationResult', holding a tuple of two IntSets @(acked, nacked)@, ontaining the delivery tags
for the messages that have been confirmed by the broker.
-}
waitForConfirms :: Channel -> IO ConfirmationResult
waitForConfirms :: Channel -> IO ConfirmationResult
waitForConfirms Channel
chan = STM ConfirmationResult -> IO ConfirmationResult
forall a. STM a -> IO a
atomically (STM ConfirmationResult -> IO ConfirmationResult)
-> STM ConfirmationResult -> IO ConfirmationResult
forall a b. (a -> b) -> a -> b
$ (IntSet, IntSet) -> ConfirmationResult
Complete ((IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan

{- | Same as 'waitForConfirms', but with a timeout in microseconds. Note that, since this operation
may timeout before the server has acked or nacked all pending messages, the returned 'ConfirmationResult'
should be pattern-matched for the constructors @Complete (acked, nacked)@ and @Partial (acked, nacked, pending)@
-}
waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult
waitForConfirmsUntil :: Channel -> Int -> IO ConfirmationResult
waitForConfirmsUntil Channel
chan Int
timeout = do
    TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
timeout
    let partial :: STM ConfirmationResult
partial = do
            Bool
expired <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
            if Bool
expired
                then (IntSet, IntSet, IntSet) -> ConfirmationResult
Partial ((IntSet, IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((,,)
                    (IntSet -> IntSet -> IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> IntSet -> (IntSet, IntSet, IntSet))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
ackedSet Channel
chan) IntSet
IntSet.empty
                    STM (IntSet -> IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet -> (IntSet, IntSet, IntSet))
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> IntSet -> STM IntSet
forall a. TVar a -> a -> STM a
swapTVar (Channel -> TVar IntSet
nackedSet Channel
chan) IntSet
IntSet.empty
                    STM (IntSet -> (IntSet, IntSet, IntSet))
-> STM IntSet -> STM (IntSet, IntSet, IntSet)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TVar IntSet -> STM IntSet
forall a. TVar a -> STM a
readTVar (Channel -> TVar IntSet
unconfirmedSet Channel
chan))
                else STM ConfirmationResult
forall a. STM a
retry
        complete :: STM ConfirmationResult
complete = (IntSet, IntSet) -> ConfirmationResult
Complete ((IntSet, IntSet) -> ConfirmationResult)
-> STM (IntSet, IntSet) -> STM ConfirmationResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Channel -> STM (IntSet, IntSet)
waitForAllConfirms Channel
chan
    STM ConfirmationResult -> IO ConfirmationResult
forall a. STM a -> IO a
atomically (STM ConfirmationResult -> IO ConfirmationResult)
-> STM ConfirmationResult -> IO ConfirmationResult
forall a b. (a -> b) -> a -> b
$ STM ConfirmationResult
complete STM ConfirmationResult
-> STM ConfirmationResult -> STM ConfirmationResult
forall a. STM a -> STM a -> STM a
`orElse` STM ConfirmationResult
partial

{- | Adds a handler which will be invoked each time the @Channel@ receives a confirmation from the broker.
The parameters passed to the the handler are the @deliveryTag@ for the message being confirmed, a flag
indicating whether the confirmation refers to this message individually (@False@) or all messages up to
this one (@True@) and an @AckType@ whose value can be either @BasicAck@ or @BasicNack@.
-}
addConfirmationListener :: Channel -> ((Word64, Bool, AckType) -> IO ()) -> IO ()
addConfirmationListener :: Channel -> ((Timestamp, Bool, AckType) -> IO ()) -> IO ()
addConfirmationListener Channel
chan (Timestamp, Bool, AckType) -> IO ()
handler =
    MVar [(Timestamp, Bool, AckType) -> IO ()]
-> ([(Timestamp, Bool, AckType) -> IO ()]
    -> IO [(Timestamp, Bool, AckType) -> IO ()])
-> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Channel -> MVar [(Timestamp, Bool, AckType) -> IO ()]
confirmListeners Channel
chan) (([(Timestamp, Bool, AckType) -> IO ()]
  -> IO [(Timestamp, Bool, AckType) -> IO ()])
 -> IO ())
-> ([(Timestamp, Bool, AckType) -> IO ()]
    -> IO [(Timestamp, Bool, AckType) -> IO ()])
-> IO ()
forall a b. (a -> b) -> a -> b
$ \[(Timestamp, Bool, AckType) -> IO ()]
listeners -> [(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([(Timestamp, Bool, AckType) -> IO ()]
 -> IO [(Timestamp, Bool, AckType) -> IO ()])
-> [(Timestamp, Bool, AckType) -> IO ()]
-> IO [(Timestamp, Bool, AckType) -> IO ()]
forall a b. (a -> b) -> a -> b
$ (Timestamp, Bool, AckType) -> IO ()
handler((Timestamp, Bool, AckType) -> IO ())
-> [(Timestamp, Bool, AckType) -> IO ()]
-> [(Timestamp, Bool, AckType) -> IO ()]
forall a. a -> [a] -> [a]
:[(Timestamp, Bool, AckType) -> IO ()]
listeners

--------------------- FLOW CONTROL ------------------------


{- | @flow chan active@ tells the AMQP server to pause or restart the flow of content
    data. This is a simple flow-control mechanism that a peer can use
    to avoid overflowing its queues or otherwise finding itself receiving
    more messages than it can process.

    If @active==True@ the server will start sending content data, if @active==False@ the server will stop sending content data.

    A new channel is always active by default.

    NOTE: RabbitMQ 1.7 doesn't implement this command.
    -}
flow :: Channel -> Bool -> IO ()
flow :: Channel -> Bool -> IO ()
flow Channel
chan Bool
active = do
    SimpleMethod (Channel_flow_ok Bool
_) <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (Bool -> MethodPayload
Channel_flow Bool
active)
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Constructs default connection options with the following settings :

--

--     * Broker: @amqp:\/\/guest:guest\@localhost:5672\/%2F@ using the @PLAIN@ SASL mechanism

--

--     * max frame size: @131072@

--

--     * use the heartbeat delay suggested by the broker

--

--     * no limit on the number of used channels

--

defaultConnectionOpts :: ConnectionOpts
defaultConnectionOpts :: ConnectionOpts
defaultConnectionOpts = [(String, PortNumber)]
-> Text
-> [SASLMechanism]
-> Maybe Word32
-> Maybe ShortInt
-> Maybe ShortInt
-> Maybe TLSSettings
-> Maybe Text
-> ConnectionOpts
ConnectionOpts [(String
"localhost", PortNumber
5672)] Text
"/" [Text -> Text -> SASLMechanism
plain Text
"guest" Text
"guest"] (Word32 -> Maybe Word32
forall a. a -> Maybe a
Just Word32
131072) Maybe ShortInt
forall a. Maybe a
Nothing Maybe ShortInt
forall a. Maybe a
Nothing Maybe TLSSettings
forall a. Maybe a
Nothing Maybe Text
forall a. Maybe a
Nothing

-- | @openConnection hostname virtualHost loginName loginPassword@ opens a connection to an AMQP server running on @hostname@.

-- @virtualHost@ is used as a namespace for AMQP resources (default is \"/\"), so different applications could use multiple virtual hosts on the same AMQP server.

--

-- You must call 'closeConnection' before your program exits to ensure that all published messages are received by the server.

--

-- The @loginName@ and @loginPassword@ will be used to authenticate via the 'PLAIN' SASL mechanism.

--

-- NOTE: If the login name, password or virtual host are invalid, this method will throw a 'ConnectionClosedException'. The exception will not contain a reason why the connection was closed, so you'll have to find out yourself.

openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection :: String -> Text -> Text -> Text -> IO Connection
openConnection String
host = String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' String
host PortNumber
5672

-- | same as 'openConnection' but allows you to specify a non-default port-number as the 2nd parameter

openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' :: String -> PortNumber -> Text -> Text -> Text -> IO Connection
openConnection' String
host PortNumber
port Text
vhost Text
loginName Text
loginPassword = ConnectionOpts -> IO Connection
openConnection'' (ConnectionOpts -> IO Connection)
-> ConnectionOpts -> IO Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts
defaultConnectionOpts {
    coServers = [(host, port)],
    coVHost   = vhost,
    coAuth    = [plain loginName loginPassword]
}

-- | The @PLAIN@ SASL mechanism. See <http://tools.ietf.org/html/rfc4616 RFC4616>

plain :: Text -> Text -> SASLMechanism
plain :: Text -> Text -> SASLMechanism
plain Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"PLAIN" ByteString
initialResponse Maybe (ByteString -> IO ByteString)
forall a. Maybe a
Nothing
  where
    nul :: Char
nul = Char
'\0'
    initialResponse :: ByteString
initialResponse = Text -> ByteString
E.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Char -> Text -> Text
T.cons Char
nul Text
loginName Text -> Text -> Text
`T.append` Char -> Text -> Text
T.cons Char
nul Text
loginPassword

-- | The @AMQPLAIN@ SASL mechanism. See <http://www.rabbitmq.com/authentication.html>.

amqplain :: Text -> Text -> SASLMechanism
amqplain :: Text -> Text -> SASLMechanism
amqplain Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"AMQPLAIN" ByteString
initialResponse Maybe (ByteString -> IO ByteString)
forall a. Maybe a
Nothing
  where
    initialResponse :: ByteString
initialResponse = ByteString -> ByteString
toStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Int64 -> ByteString -> ByteString
BL.drop Int64
4 (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ FieldTable -> Put
forall t. Binary t => t -> Put
put (FieldTable -> Put) -> FieldTable -> Put
forall a b. (a -> b) -> a -> b
$ Map Text FieldValue -> FieldTable
FieldTable (Map Text FieldValue -> FieldTable)
-> Map Text FieldValue -> FieldTable
forall a b. (a -> b) -> a -> b
$ [(Text, FieldValue)] -> Map Text FieldValue
forall k a. Ord k => [(k, a)] -> Map k a
M.fromList [(Text
"LOGIN",ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
loginName), (Text
"PASSWORD", ByteString -> FieldValue
FVString (ByteString -> FieldValue) -> ByteString -> FieldValue
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
loginPassword)]

-- | The @RABBIT-CR-DEMO@ SASL mechanism needs to be explicitly enabled on the RabbitMQ server and should only be used for demonstration purposes of the challenge-response cycle.

-- See <http://www.rabbitmq.com/authentication.html>.

rabbitCRdemo :: Text -> Text -> SASLMechanism
rabbitCRdemo :: Text -> Text -> SASLMechanism
rabbitCRdemo Text
loginName Text
loginPassword = Text
-> ByteString
-> Maybe (ByteString -> IO ByteString)
-> SASLMechanism
SASLMechanism Text
"RABBIT-CR-DEMO" ByteString
initialResponse ((ByteString -> IO ByteString)
-> Maybe (ByteString -> IO ByteString)
forall a. a -> Maybe a
Just ((ByteString -> IO ByteString)
 -> Maybe (ByteString -> IO ByteString))
-> (ByteString -> IO ByteString)
-> Maybe (ByteString -> IO ByteString)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> ByteString -> IO ByteString
forall a b. a -> b -> a
const IO ByteString
challengeResponse)
  where
    initialResponse :: ByteString
initialResponse = Text -> ByteString
E.encodeUtf8 Text
loginName
    challengeResponse :: IO ByteString
challengeResponse = ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
E.encodeUtf8 Text
"My password is " ByteString -> ByteString -> ByteString
`BS.append` Text -> ByteString
E.encodeUtf8 Text
loginPassword

-- | @qos chan prefetchSize prefetchCount global@ limits the amount of data the server

-- delivers before requiring acknowledgements. @prefetchSize@ specifies the

-- number of bytes and @prefetchCount@ the number of messages. In both cases

-- the value 0 means unlimited.

--

-- The meaning of the @global@ flag is explained here: <http://www.rabbitmq.com/consumer-prefetch.html>

--

-- NOTE: RabbitMQ does not implement prefetchSize and will throw an exception if it doesn't equal 0.

qos :: Channel -> Word32 -> Word16 -> Bool -> IO ()
qos :: Channel -> Word32 -> ShortInt -> Bool -> IO ()
qos Channel
chan Word32
prefetchSize ShortInt
prefetchCount Bool
global = do
    SimpleMethod MethodPayload
Basic_qos_ok <- Channel -> Assembly -> IO Assembly
request Channel
chan (Assembly -> IO Assembly) -> Assembly -> IO Assembly
forall a b. (a -> b) -> a -> b
$ MethodPayload -> Assembly
SimpleMethod (MethodPayload -> Assembly) -> MethodPayload -> Assembly
forall a b. (a -> b) -> a -> b
$ Word32 -> ShortInt -> Bool -> MethodPayload
Basic_qos
        Word32
prefetchSize
        ShortInt
prefetchCount
        Bool
global
    () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Parses amqp standard URI of the form @amqp://user:password@host:port/vhost@ and returns a @ConnectionOpts@ for use with @openConnection''@

-- | Any of these fields may be empty and will be replaced with defaults from @amqp://guest:guest@localhost:5672/@

fromURI :: String -> ConnectionOpts
fromURI :: String -> ConnectionOpts
fromURI String
uri = ConnectionOpts
defaultConnectionOpts {
    coServers     = hostPorts',
    coVHost       = T.pack vhost,
    coAuth        = [plain (T.pack uid) (T.pack pw)],
    coTLSSettings = if tls then Just TLSTrusted else Nothing
  }
  where ([(String, Int)]
hostPorts, String
uid, String
pw, String
vhost, Bool
tls) = String -> ([(String, Int)], String, String, String, Bool)
fromURI' String
uri
        hostPorts' :: [(String, PortNumber)]
hostPorts' = [(String
h, Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p) | (String
h, Int
p) <- [(String, Int)]
hostPorts]

fromURI' :: String -> ([(String, Int)], String, String, String, Bool)
fromURI' :: String -> ([(String, Int)], String, String, String, Bool)
fromURI' String
uri = (Int -> String -> (String, Int)
fromHostPort Int
dport (String -> (String, Int)) -> [String] -> [(String, Int)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [String]
hstPorts,
    ShowS
unEscapeString ((Char -> Bool) -> ShowS
forall a. (a -> Bool) -> [a] -> [a]
dropWhile (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
==Char
'/') String
uid), ShowS
unEscapeString String
pw,
    ShowS
unEscapeString String
vhost, Bool
tls)
  where (String
pre :String
suf  :    [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"@" (String
uri String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"@" ) -- look mom, no regexp dependencies

        (String
pro :String
uid' :String
pw':[String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
":" (String
pre String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"::")
        (String
hnp :String
thost:    [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"/" (String
suf String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"/" )
        hstPorts :: [String]
hstPorts           = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
"," String
hnp
        vhost :: String
vhost = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
thost     then String
"/"     else String
thost
        dport :: Int
dport = if String
pro String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"amqps" then Int
5671    else Int
5672
        uid :: String
uid   = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
uid'      then String
"guest" else String
uid'
        pw :: String
pw    = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
pw'       then String
"guest" else String
pw'
        tls :: Bool
tls   = String
pro String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"amqps"

fromHostPort :: Int -> String -> (String, Int)
fromHostPort :: Int -> String -> (String, Int)
fromHostPort Int
dport String
hostPort = (ShowS
unEscapeString String
host, Int
nport)
    where
        (String
hst':String
port :    [String]
_) = String -> String -> [String]
forall a. Eq a => [a] -> [a] -> [[a]]
splitOn String
":" (String
hostPort String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
":" )
        host :: String
host  = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
hst' then String
"localhost" else String
hst'
        nport :: Int
nport = if String -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null String
port then Int
dport       else String -> Int
forall a. Read a => String -> a
read String
port