{-# LANGUAGE RecordWildCards #-}
module Network.AMQP.Extended
( RabbitMqHooks (..),
RabbitMqAdminOpts (..),
AmqpEndpoint (..),
withConnection,
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqAdminClientEnvWithCreds,
mkRabbitMqChannelMVar,
demoteOpts,
RabbitMqTlsOpts (..),
mkConnectionOpts,
mkTLSSettings,
readCredsFromEnv,
)
where
import Control.Exception (throwIO)
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Control.Monad.Trans.Maybe
import Control.Retry
import Data.Aeson
import Data.Aeson.Types
import Data.Default
import Data.Proxy
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.X509.CertificateStore qualified as X509
import Imports
import Network.AMQP qualified as Q
import Network.Connection as Conn
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Client.TLS qualified as HTTP
import Network.RabbitMqAdmin
import Network.TLS
import Network.TLS.Extra.Cipher
import Servant
import Servant.Client
import Servant.Client qualified as Servant
import System.Logger (Logger)
import System.Logger qualified as Log
import UnliftIO.Async
data RabbitMqHooks m = RabbitMqHooks
{
forall (m :: * -> *). RabbitMqHooks m -> Channel -> m ()
onNewChannel :: Q.Channel -> m (),
forall (m :: * -> *). RabbitMqHooks m -> m ()
onConnectionClose :: m (),
forall (m :: * -> *). RabbitMqHooks m -> SomeException -> m ()
onChannelException :: SomeException -> m ()
}
data RabbitMqTlsOpts = RabbitMqTlsOpts
{ RabbitMqTlsOpts -> Maybe FilePath
caCert :: !(Maybe FilePath),
RabbitMqTlsOpts -> Bool
insecureSkipVerifyTls :: Bool
}
deriving (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
(RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> Eq RabbitMqTlsOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
$c/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
Eq, Int -> RabbitMqTlsOpts -> ShowS
[RabbitMqTlsOpts] -> ShowS
RabbitMqTlsOpts -> FilePath
(Int -> RabbitMqTlsOpts -> ShowS)
-> (RabbitMqTlsOpts -> FilePath)
-> ([RabbitMqTlsOpts] -> ShowS)
-> Show RabbitMqTlsOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqTlsOpts -> ShowS
showsPrec :: Int -> RabbitMqTlsOpts -> ShowS
$cshow :: RabbitMqTlsOpts -> FilePath
show :: RabbitMqTlsOpts -> FilePath
$cshowList :: [RabbitMqTlsOpts] -> ShowS
showList :: [RabbitMqTlsOpts] -> ShowS
Show)
parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v = do
Bool
enabled <- Object
v Object -> Key -> Parser (Maybe Bool)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"enableTls" Parser (Maybe Bool) -> Bool -> Parser Bool
forall a. Parser (Maybe a) -> a -> Parser a
.!= Bool
False
if Bool
enabled
then
RabbitMqTlsOpts -> Maybe RabbitMqTlsOpts
forall a. a -> Maybe a
Just
(RabbitMqTlsOpts -> Maybe RabbitMqTlsOpts)
-> Parser RabbitMqTlsOpts -> Parser (Maybe RabbitMqTlsOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( Maybe FilePath -> Bool -> RabbitMqTlsOpts
RabbitMqTlsOpts
(Maybe FilePath -> Bool -> RabbitMqTlsOpts)
-> Parser (Maybe FilePath) -> Parser (Bool -> RabbitMqTlsOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser (Maybe FilePath)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"caCert"
Parser (Bool -> RabbitMqTlsOpts)
-> Parser Bool -> Parser RabbitMqTlsOpts
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser (Maybe Bool)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"insecureSkipVerifyTls" Parser (Maybe Bool) -> Bool -> Parser Bool
forall a. Parser (Maybe a) -> a -> Parser a
.!= Bool
False
)
else Maybe RabbitMqTlsOpts -> Parser (Maybe RabbitMqTlsOpts)
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RabbitMqTlsOpts
forall a. Maybe a
Nothing
data RabbitMqAdminOpts = RabbitMqAdminOpts
{ RabbitMqAdminOpts -> FilePath
host :: !String,
RabbitMqAdminOpts -> Int
port :: !Int,
RabbitMqAdminOpts -> VHost
vHost :: !Text,
RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
tls :: Maybe RabbitMqTlsOpts,
RabbitMqAdminOpts -> Int
adminPort :: !Int
}
deriving (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
(RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> Eq RabbitMqAdminOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
$c/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
Eq, Int -> RabbitMqAdminOpts -> ShowS
[RabbitMqAdminOpts] -> ShowS
RabbitMqAdminOpts -> FilePath
(Int -> RabbitMqAdminOpts -> ShowS)
-> (RabbitMqAdminOpts -> FilePath)
-> ([RabbitMqAdminOpts] -> ShowS)
-> Show RabbitMqAdminOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqAdminOpts -> ShowS
showsPrec :: Int -> RabbitMqAdminOpts -> ShowS
$cshow :: RabbitMqAdminOpts -> FilePath
show :: RabbitMqAdminOpts -> FilePath
$cshowList :: [RabbitMqAdminOpts] -> ShowS
showList :: [RabbitMqAdminOpts] -> ShowS
Show)
instance FromJSON RabbitMqAdminOpts where
parseJSON :: Value -> Parser RabbitMqAdminOpts
parseJSON = FilePath
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"RabbitMqAdminOpts" ((Object -> Parser RabbitMqAdminOpts)
-> Value -> Parser RabbitMqAdminOpts)
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a b. (a -> b) -> a -> b
$ \Object
v ->
FilePath
-> Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> Int
-> RabbitMqAdminOpts
RabbitMqAdminOpts
(FilePath
-> Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> Int
-> RabbitMqAdminOpts)
-> Parser FilePath
-> Parser
(Int -> VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
Parser
(Int -> VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser Int
-> Parser
(VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
Parser (VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser VHost
-> Parser (Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
Parser (Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser (Maybe RabbitMqTlsOpts)
-> Parser (Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v
Parser (Int -> RabbitMqAdminOpts)
-> Parser Int -> Parser RabbitMqAdminOpts
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"adminPort"
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> Text -> Text -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts VHost
username VHost
password = do
Maybe TLSSettings
mTlsSettings <- (RabbitMqTlsOpts -> IO TLSSettings)
-> Maybe RabbitMqTlsOpts -> IO (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings RabbitMqAdminOpts
opts.host) RabbitMqAdminOpts
opts.tls
let (Scheme
protocol, ManagerSettings
managerSettings) = case Maybe TLSSettings
mTlsSettings of
Maybe TLSSettings
Nothing -> (Scheme
Servant.Http, ManagerSettings
HTTP.defaultManagerSettings)
Just TLSSettings
tlsSettings -> (Scheme
Servant.Https, TLSSettings -> Maybe SockSettings -> ManagerSettings
HTTP.mkManagerSettings TLSSettings
tlsSettings Maybe SockSettings
forall a. Maybe a
Nothing)
Manager
manager <- ManagerSettings -> IO Manager
HTTP.newManager ManagerSettings
managerSettings
let basicAuthData :: BasicAuthData
basicAuthData = ByteString -> ByteString -> BasicAuthData
Servant.BasicAuthData (VHost -> ByteString
Text.encodeUtf8 VHost
username) (VHost -> ByteString
Text.encodeUtf8 VHost
password)
clientEnv :: ClientEnv
clientEnv = Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
manager (Scheme -> FilePath -> Int -> FilePath -> BaseUrl
Servant.BaseUrl Scheme
protocol RabbitMqAdminOpts
opts.host RabbitMqAdminOpts
opts.adminPort FilePath
"")
AdminAPI (AsClientT IO) -> IO (AdminAPI (AsClientT IO))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AdminAPI (AsClientT IO) -> IO (AdminAPI (AsClientT IO)))
-> ((((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
:<|> (VHost -> VHost -> IO NoContent))
:<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> AdminAPI (AsClientT IO))
-> (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
:<|> (VHost -> VHost -> IO NoContent))
:<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> IO (AdminAPI (AsClientT IO))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
:<|> (VHost -> VHost -> IO NoContent))
:<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> AdminAPI (AsClientT IO)
ToServant AdminAPI (AsClientT IO) -> AdminAPI (AsClientT IO)
forall {k} (routes :: k -> *) (mode :: k).
GenericServant routes mode =>
ToServant routes mode -> routes mode
fromServant ((((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
:<|> (VHost -> VHost -> IO NoContent))
:<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> IO (AdminAPI (AsClientT IO)))
-> (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
:<|> (VHost -> VHost -> IO NoContent))
:<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> IO (AdminAPI (AsClientT IO))
forall a b. (a -> b) -> a -> b
$
Proxy
((("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (QueryParam' '[Required, Strict] "name" VHost
:> (QueryParam' '[Required, Strict] "use_regex" Bool
:> (QueryParam' '[Required, Strict] "page_size" Int
:> (QueryParam' '[Required, Strict] "page" Int
:> Get '[JSON] (Page Queue))))))))
:<|> ("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (Capture "queue" VHost :> DeleteNoContent)))))
:<|> (("api"
:> ("vhosts"
:> (Capture "vhost" VHost
:> ("connections" :> Get '[JSON] [Connection]))))
:<|> ("api"
:> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
-> (forall a. ClientM a -> IO a)
-> Client
ClientM
((("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (QueryParam' '[Required, Strict] "name" VHost
:> (QueryParam' '[Required, Strict] "use_regex" Bool
:> (QueryParam' '[Required, Strict] "page_size" Int
:> (QueryParam' '[Required, Strict] "page" Int
:> Get '[JSON] (Page Queue))))))))
:<|> ("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (Capture "queue" VHost :> DeleteNoContent)))))
:<|> (("api"
:> ("vhosts"
:> (Capture "vhost" VHost
:> ("connections" :> Get '[JSON] [Connection]))))
:<|> ("api"
:> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
-> Client
IO
((("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (QueryParam' '[Required, Strict] "name" VHost
:> (QueryParam' '[Required, Strict] "use_regex" Bool
:> (QueryParam' '[Required, Strict] "page_size" Int
:> (QueryParam' '[Required, Strict] "page" Int
:> Get '[JSON] (Page Queue))))))))
:<|> ("api"
:> ("queues"
:> (Capture "vhost" VHost
:> (Capture "queue" VHost :> DeleteNoContent)))))
:<|> (("api"
:> ("vhosts"
:> (Capture "vhost" VHost
:> ("connections" :> Get '[JSON] [Connection]))))
:<|> ("api"
:> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
forall api (m :: * -> *) (n :: * -> *).
HasClient ClientM api =>
Proxy api -> (forall a. m a -> n a) -> Client m api -> Client n api
hoistClient
(forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ToServant AdminAPI AsApi))
((ClientError -> IO a)
-> (a -> IO a) -> Either ClientError a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ClientError -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ClientError a -> IO a)
-> (ClientM a -> IO (Either ClientError a)) -> ClientM a -> IO a
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< (ClientM a -> ClientEnv -> IO (Either ClientError a))
-> ClientEnv -> ClientM a -> IO (Either ClientError a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ClientM a -> ClientEnv -> IO (Either ClientError a)
forall a. ClientM a -> ClientEnv -> IO (Either ClientError a)
runClientM ClientEnv
clientEnv)
(AdminAPI (AsClientT ClientM)
-> ToServant AdminAPI (AsClientT ClientM)
forall {k} (routes :: k -> *) (mode :: k).
GenericServant routes mode =>
routes mode -> ToServant routes mode
toServant (AdminAPI (AsClientT ClientM)
-> ToServant AdminAPI (AsClientT ClientM))
-> AdminAPI (AsClientT ClientM)
-> ToServant AdminAPI (AsClientT ClientM)
forall a b. (a -> b) -> a -> b
$ BasicAuthData -> AdminAPI (AsClientT ClientM)
adminClient BasicAuthData
basicAuthData)
mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv RabbitMqAdminOpts
opts = IO (VHost, VHost)
readCredsFromEnv IO (VHost, VHost)
-> ((VHost, VHost) -> IO (AdminAPI (AsClientT IO)))
-> IO (AdminAPI (AsClientT IO))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (VHost -> VHost -> IO (AdminAPI (AsClientT IO)))
-> (VHost, VHost) -> IO (AdminAPI (AsClientT IO))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts)
data AmqpEndpoint = AmqpEndpoint
{ AmqpEndpoint -> FilePath
host :: !String,
AmqpEndpoint -> Int
port :: !Int,
AmqpEndpoint -> VHost
vHost :: !Text,
AmqpEndpoint -> Maybe RabbitMqTlsOpts
tls :: !(Maybe RabbitMqTlsOpts)
}
deriving (AmqpEndpoint -> AmqpEndpoint -> Bool
(AmqpEndpoint -> AmqpEndpoint -> Bool)
-> (AmqpEndpoint -> AmqpEndpoint -> Bool) -> Eq AmqpEndpoint
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AmqpEndpoint -> AmqpEndpoint -> Bool
== :: AmqpEndpoint -> AmqpEndpoint -> Bool
$c/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
Eq, Int -> AmqpEndpoint -> ShowS
[AmqpEndpoint] -> ShowS
AmqpEndpoint -> FilePath
(Int -> AmqpEndpoint -> ShowS)
-> (AmqpEndpoint -> FilePath)
-> ([AmqpEndpoint] -> ShowS)
-> Show AmqpEndpoint
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AmqpEndpoint -> ShowS
showsPrec :: Int -> AmqpEndpoint -> ShowS
$cshow :: AmqpEndpoint -> FilePath
show :: AmqpEndpoint -> FilePath
$cshowList :: [AmqpEndpoint] -> ShowS
showList :: [AmqpEndpoint] -> ShowS
Show)
instance FromJSON AmqpEndpoint where
parseJSON :: Value -> Parser AmqpEndpoint
parseJSON = FilePath
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"AmqpEndpoint" ((Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint)
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a b. (a -> b) -> a -> b
$ \Object
v ->
FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint
AmqpEndpoint
(FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser FilePath
-> Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser Int
-> Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser VHost -> Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser (Maybe RabbitMqTlsOpts) -> Parser AmqpEndpoint
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts RabbitMqAdminOpts {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:RabbitMqAdminOpts :: RabbitMqAdminOpts -> FilePath
$sel:port:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Int
$sel:vHost:RabbitMqAdminOpts :: RabbitMqAdminOpts -> VHost
$sel:tls:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
$sel:adminPort:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Int
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
adminPort :: Int
..} = AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Channel)
mkRabbitMqChannelMVar Logger
l AmqpEndpoint
opts = do
MVar Channel
chanMVar <- IO (MVar Channel)
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
Async ()
connThread <-
IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ()))
-> (RabbitMqHooks IO -> IO ()) -> RabbitMqHooks IO -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger -> AmqpEndpoint -> RabbitMqHooks IO -> IO ()
forall (m :: * -> *).
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger -> AmqpEndpoint -> RabbitMqHooks m -> m ()
openConnectionWithRetries Logger
l AmqpEndpoint
opts (RabbitMqHooks IO -> IO (Async ()))
-> RabbitMqHooks IO -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
RabbitMqHooks
{ $sel:onNewChannel:RabbitMqHooks :: Channel -> IO ()
onNewChannel = \Channel
conn -> MVar Channel -> Channel -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar Channel
chanMVar Channel
conn IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
forall a. Bounded a => a
maxBound),
$sel:onChannelException:RabbitMqHooks :: SomeException -> IO ()
onChannelException = \SomeException
_ -> IO (Maybe Channel) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Channel) -> IO ()) -> IO (Maybe Channel) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Channel -> IO (Maybe Channel)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar Channel
chanMVar,
$sel:onConnectionClose:RabbitMqHooks :: IO ()
onConnectionClose = IO (Maybe Channel) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Channel) -> IO ()) -> IO (Maybe Channel) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Channel -> IO (Maybe Channel)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar Channel
chanMVar
}
Async ()
waitForConnThread <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ MVar Channel -> (Channel -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Channel
chanMVar ((Channel -> IO ()) -> IO ()) -> (Channel -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Channel
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Async () -> Async () -> IO (Either () ())
forall (m :: * -> *) a b.
MonadIO m =>
Async a -> Async b -> m (Either a b)
waitEither Async ()
connThread Async ()
waitForConnThread IO (Either () ())
-> (Either () () -> IO (MVar Channel)) -> IO (MVar Channel)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left () -> RabbitMqConnectionError -> IO (MVar Channel)
forall e a. Exception e => e -> IO a
throwIO (RabbitMqConnectionError -> IO (MVar Channel))
-> RabbitMqConnectionError -> IO (MVar Channel)
forall a b. (a -> b) -> a -> b
$ FilePath -> RabbitMqConnectionError
RabbitMqConnectionFailed FilePath
"connection thread finished before getting connection"
Right () -> MVar Channel -> IO (MVar Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MVar Channel
chanMVar
data RabbitMqConnectionError = RabbitMqConnectionFailed String
deriving (Int -> RabbitMqConnectionError -> ShowS
[RabbitMqConnectionError] -> ShowS
RabbitMqConnectionError -> FilePath
(Int -> RabbitMqConnectionError -> ShowS)
-> (RabbitMqConnectionError -> FilePath)
-> ([RabbitMqConnectionError] -> ShowS)
-> Show RabbitMqConnectionError
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqConnectionError -> ShowS
showsPrec :: Int -> RabbitMqConnectionError -> ShowS
$cshow :: RabbitMqConnectionError -> FilePath
show :: RabbitMqConnectionError -> FilePath
$cshowList :: [RabbitMqConnectionError] -> ShowS
showList :: [RabbitMqConnectionError] -> ShowS
Show)
instance Exception RabbitMqConnectionError
withConnection ::
forall m a.
(MonadIO m, MonadMask m) =>
Logger ->
AmqpEndpoint ->
(Q.Connection -> m a) ->
m a
withConnection :: forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Logger -> AmqpEndpoint -> (Connection -> m a) -> m a
withConnection Logger
l AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} Connection -> m a
k = do
let policy :: RetryPolicyM m
policy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
limitRetriesByCumulativeDelay Int
1_000_000 (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff Int
1000
logError :: Bool -> SomeException -> RetryStatus -> m ()
logError Bool
willRetry SomeException
e RetryStatus
retryStatus = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Failed to connect to RabbitMQ")
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (forall e. Exception e => e -> FilePath
displayException @SomeException SomeException
e)
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" Bool
willRetry
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" RetryStatus
retryStatus.rsIterNumber
getConn :: m Connection
getConn =
RetryPolicyM m
-> [RetryStatus -> Handler m Bool]
-> (RetryStatus -> m Connection)
-> m Connection
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering
RetryPolicyM m
policy
( [RetryStatus -> Handler m Bool]
forall (m :: * -> *). MonadIO m => [RetryStatus -> Handler m Bool]
skipAsyncExceptions
[RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> m Bool)
-> (Bool -> SomeException -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (m Bool -> SomeException -> m Bool
forall a b. a -> b -> a
const (m Bool -> SomeException -> m Bool)
-> m Bool -> SomeException -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> m ()
logError]
)
( m Connection -> RetryStatus -> m Connection
forall a b. a -> b -> a
const (m Connection -> RetryStatus -> m Connection)
-> m Connection -> RetryStatus -> m Connection
forall a b. (a -> b) -> a -> b
$ do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Trying to connect to RabbitMQ")
ConnectionOpts
connOpts <- AmqpEndpoint -> m ConnectionOpts
forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
IO Connection -> m Connection
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> IO Connection
Q.openConnection'' ConnectionOpts
connOpts
)
m Connection -> (Connection -> m ()) -> (Connection -> m a) -> m a
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket m Connection
getConn (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Q.closeConnection) Connection -> m a
k
mkConnectionOpts :: (MonadIO m) => AmqpEndpoint -> m Q.ConnectionOpts
mkConnectionOpts :: forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} = do
Maybe TLSSettings
mTlsSettings <- (RabbitMqTlsOpts -> m TLSSettings)
-> Maybe RabbitMqTlsOpts -> m (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (IO TLSSettings -> m TLSSettings
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TLSSettings -> m TLSSettings)
-> (RabbitMqTlsOpts -> IO TLSSettings)
-> RabbitMqTlsOpts
-> m TLSSettings
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host)) Maybe RabbitMqTlsOpts
tls
(VHost
username, VHost
password) <- IO (VHost, VHost) -> m (VHost, VHost)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (VHost, VHost) -> m (VHost, VHost))
-> IO (VHost, VHost) -> m (VHost, VHost)
forall a b. (a -> b) -> a -> b
$ IO (VHost, VHost)
readCredsFromEnv
ConnectionOpts -> m ConnectionOpts
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
ConnectionOpts
Q.defaultConnectionOpts
{ Q.coServers = [(host, fromIntegral port)],
Q.coVHost = vHost,
Q.coAuth = [Q.plain username password],
Q.coTLSSettings = fmap Q.TLSCustom mTlsSettings
}
openConnectionWithRetries ::
forall m.
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger ->
AmqpEndpoint ->
RabbitMqHooks m ->
m ()
openConnectionWithRetries :: forall (m :: * -> *).
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger -> AmqpEndpoint -> RabbitMqHooks m -> m ()
openConnectionWithRetries Logger
l AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} RabbitMqHooks m
hooks = do
(VHost
username, VHost
password) <- IO (VHost, VHost) -> m (VHost, VHost)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (VHost, VHost) -> m (VHost, VHost))
-> IO (VHost, VHost) -> m (VHost, VHost)
forall a b. (a -> b) -> a -> b
$ IO (VHost, VHost)
readCredsFromEnv
VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password
where
connectWithRetries :: Text -> Text -> m ()
connectWithRetries :: VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password = do
let policy :: RetryPolicyM m
policy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay Int
5_000_000 (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff Int
1000
logError :: Bool -> SomeException -> RetryStatus -> m ()
logError Bool
willRetry SomeException
e RetryStatus
retryStatus = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Failed to connect to RabbitMQ")
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (forall e. Exception e => e -> FilePath
displayException @SomeException SomeException
e)
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" Bool
willRetry
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" RetryStatus
retryStatus.rsIterNumber
getConn :: m Connection
getConn =
RetryPolicyM m
-> [RetryStatus -> Handler m Bool]
-> (RetryStatus -> m Connection)
-> m Connection
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering
RetryPolicyM m
policy
( [RetryStatus -> Handler m Bool]
forall (m :: * -> *). MonadIO m => [RetryStatus -> Handler m Bool]
skipAsyncExceptions
[RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> m Bool)
-> (Bool -> SomeException -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (m Bool -> SomeException -> m Bool
forall a b. a -> b -> a
const (m Bool -> SomeException -> m Bool)
-> m Bool -> SomeException -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> m ()
logError]
)
( m Connection -> RetryStatus -> m Connection
forall a b. a -> b -> a
const (m Connection -> RetryStatus -> m Connection)
-> m Connection -> RetryStatus -> m Connection
forall a b. (a -> b) -> a -> b
$ do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Trying to connect to RabbitMQ")
ConnectionOpts
connOpts <- AmqpEndpoint -> m ConnectionOpts
forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
IO Connection -> m Connection
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> IO Connection
Q.openConnection'' ConnectionOpts
connOpts
)
m Connection
-> (Connection -> m ()) -> (Connection -> m ()) -> m ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket m Connection
getConn (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Q.closeConnection) ((Connection -> m ()) -> m ()) -> (Connection -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
(RunInBase m IO -> IO ()) -> m ()
forall a. (RunInBase m IO -> IO a) -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b a) -> m a
liftBaseWith ((RunInBase m IO -> IO ()) -> m ())
-> (RunInBase m IO -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
Connection -> Bool -> IO () -> IO ()
Q.addConnectionClosedHandler Connection
conn Bool
True (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ())) -> m () -> IO (StM m ())
forall a b. (a -> b) -> a -> b
$ do
RabbitMqHooks m
hooks.onConnectionClose
m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway"
VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password
Connection -> m ()
openChan Connection
conn
openChan :: Q.Connection -> m ()
openChan :: Connection -> m ()
openChan Connection
conn = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Opening channel with RabbitMQ")
Channel
chan <- IO Channel -> m Channel
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Channel -> m Channel) -> IO Channel -> m Channel
forall a b. (a -> b) -> a -> b
$ Connection -> IO Channel
Q.openChannel Connection
conn
(RunInBase m IO -> IO ()) -> m ()
forall a. (RunInBase m IO -> IO a) -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b a) -> m a
liftBaseWith ((RunInBase m IO -> IO ()) -> m ())
-> (RunInBase m IO -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
Channel -> (SomeException -> IO ()) -> IO ()
Q.addChannelExceptionHandler Channel
chan (IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ())
-> (SomeException -> IO (StM m ())) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ()))
-> (SomeException -> m ()) -> SomeException -> IO (StM m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn)
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ channel opened")
RabbitMqHooks m
hooks.onNewChannel Channel
chan
chanExceptionHandler :: Q.Connection -> SomeException -> m ()
chanExceptionHandler :: Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn SomeException
e = do
RabbitMqHooks m
hooks.onChannelException SomeException
e m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onChannelException hook threw an exception"
case (SomeException -> Bool
Q.isNormalChannelClose SomeException
e, SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e) of
(Bool
True, Maybe AMQPException
_) ->
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ channel is closed normally, not attempting to reopen channel")
(Bool
_, Just (Q.ConnectionClosedException {})) ->
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ connection is closed, not attempting to reopen channel")
(Bool, Maybe AMQPException)
_ -> do
Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"RabbitMQ channel closed" SomeException
e
Connection -> m ()
openChan Connection
conn
mkTLSSettings :: HostName -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings :: FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host RabbitMqTlsOpts
opts = do
Maybe (Shared -> Shared)
setCAStore <- MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared)))
-> MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall a b. (a -> b) -> a -> b
$ do
FilePath
path <- MaybeT IO FilePath
-> (FilePath -> MaybeT IO FilePath)
-> Maybe FilePath
-> MaybeT IO FilePath
forall b a. b -> (a -> b) -> Maybe a -> b
maybe MaybeT IO FilePath
forall a. MaybeT IO a
forall (m :: * -> *) a. MonadPlus m => m a
mzero FilePath -> MaybeT IO FilePath
forall a. a -> MaybeT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure RabbitMqTlsOpts
opts.caCert
CertificateStore
store <- IO (Maybe CertificateStore) -> MaybeT IO CertificateStore
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO (Maybe CertificateStore) -> MaybeT IO CertificateStore)
-> IO (Maybe CertificateStore) -> MaybeT IO CertificateStore
forall a b. (a -> b) -> a -> b
$ FilePath -> IO (Maybe CertificateStore)
X509.readCertificateStore FilePath
path
(Shared -> Shared) -> MaybeT IO (Shared -> Shared)
forall a. a -> MaybeT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Shared -> Shared) -> MaybeT IO (Shared -> Shared))
-> (Shared -> Shared) -> MaybeT IO (Shared -> Shared)
forall a b. (a -> b) -> a -> b
$ \Shared
shared -> Shared
shared {sharedCAStore = store}
let setHooks :: ClientHooks -> ClientHooks
setHooks =
if RabbitMqTlsOpts
opts.insecureSkipVerifyTls
then \ClientHooks
h -> ClientHooks
h {onServerCertificate = \CertificateStore
_ ValidationCache
_ ServiceID
_ CertificateChain
_ -> [FailedReason] -> IO [FailedReason]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []}
else ClientHooks -> ClientHooks
forall a. a -> a
id
TLSSettings -> IO TLSSettings
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TLSSettings -> IO TLSSettings) -> TLSSettings -> IO TLSSettings
forall a b. (a -> b) -> a -> b
$
ClientParams -> TLSSettings
TLSSettings
(FilePath -> ByteString -> ClientParams
defaultParamsClient FilePath
host ByteString
"rabbitmq")
{ clientShared = fromMaybe id setCAStore def,
clientHooks = setHooks def,
clientSupported =
def
{ supportedVersions = [TLS13, TLS12],
supportedCiphers = ciphersuite_strong
}
}
logException :: (MonadIO m) => Logger -> String -> SomeException -> m ()
logException :: forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
m (SomeException e
e) = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
FilePath -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg FilePath
m
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (e -> FilePath
forall e. Exception e => e -> FilePath
displayException e
e)
readCredsFromEnv :: IO (Text, Text)
readCredsFromEnv :: IO (VHost, VHost)
readCredsFromEnv =
(,)
(VHost -> VHost -> (VHost, VHost))
-> IO VHost -> IO (VHost -> (VHost, VHost))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_USERNAME")
IO (VHost -> (VHost, VHost)) -> IO VHost -> IO (VHost, VHost)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_PASSWORD")