{-# LANGUAGE RecordWildCards #-}
module Network.AMQP.Extended
( RabbitMqHooks (..),
RabbitMqAdminOpts (..),
AmqpEndpoint (..),
openConnectionWithRetries,
mkRabbitMqAdminClientEnv,
mkRabbitMqAdminClientEnvWithCreds,
mkRabbitMqChannelMVar,
demoteOpts,
RabbitMqTlsOpts (..),
mkConnectionOpts,
mkTLSSettings,
readCredsFromEnv,
)
where
import Control.Exception (AsyncException, throwIO)
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Control.Monad.Trans.Maybe
import Control.Retry
import Data.Aeson
import Data.Aeson.Types
import Data.Default
import Data.Proxy
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.X509.CertificateStore qualified as X509
import Imports
import Network.AMQP qualified as Q
import Network.Connection as Conn
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Client.TLS qualified as HTTP
import Network.RabbitMqAdmin
import Network.TLS
import Network.TLS.Extra.Cipher
import Servant hiding (Handler)
import Servant.Client
import Servant.Client qualified as Servant
import System.Logger (Logger)
import System.Logger qualified as Log
import UnliftIO.Async
data RabbitMqHooks m = RabbitMqHooks
{
forall (m :: * -> *). RabbitMqHooks m -> Channel -> m ()
onNewChannel :: Q.Channel -> m (),
forall (m :: * -> *). RabbitMqHooks m -> m ()
onConnectionClose :: m (),
forall (m :: * -> *). RabbitMqHooks m -> SomeException -> m ()
onChannelException :: SomeException -> m ()
}
data RabbitMqTlsOpts = RabbitMqTlsOpts
{ RabbitMqTlsOpts -> Maybe FilePath
caCert :: !(Maybe FilePath),
RabbitMqTlsOpts -> Bool
insecureSkipVerifyTls :: Bool
}
deriving (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
(RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> Eq RabbitMqTlsOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
$c/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
Eq, Int -> RabbitMqTlsOpts -> ShowS
[RabbitMqTlsOpts] -> ShowS
RabbitMqTlsOpts -> FilePath
(Int -> RabbitMqTlsOpts -> ShowS)
-> (RabbitMqTlsOpts -> FilePath)
-> ([RabbitMqTlsOpts] -> ShowS)
-> Show RabbitMqTlsOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqTlsOpts -> ShowS
showsPrec :: Int -> RabbitMqTlsOpts -> ShowS
$cshow :: RabbitMqTlsOpts -> FilePath
show :: RabbitMqTlsOpts -> FilePath
$cshowList :: [RabbitMqTlsOpts] -> ShowS
showList :: [RabbitMqTlsOpts] -> ShowS
Show)
parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v = do
enabled <- Object
v Object -> Key -> Parser (Maybe Bool)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"enableTls" Parser (Maybe Bool) -> Bool -> Parser Bool
forall a. Parser (Maybe a) -> a -> Parser a
.!= Bool
False
if enabled
then
Just
<$> ( RabbitMqTlsOpts
<$> v .:? "caCert"
<*> v .:? "insecureSkipVerifyTls" .!= False
)
else pure Nothing
data RabbitMqAdminOpts = RabbitMqAdminOpts
{ RabbitMqAdminOpts -> FilePath
host :: !String,
RabbitMqAdminOpts -> Int
port :: !Int,
RabbitMqAdminOpts -> VHost
vHost :: !Text,
RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
tls :: Maybe RabbitMqTlsOpts,
RabbitMqAdminOpts -> FilePath
adminHost :: !String,
RabbitMqAdminOpts -> Int
adminPort :: !Int
}
deriving (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
(RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> Eq RabbitMqAdminOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
$c/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
Eq, Int -> RabbitMqAdminOpts -> ShowS
[RabbitMqAdminOpts] -> ShowS
RabbitMqAdminOpts -> FilePath
(Int -> RabbitMqAdminOpts -> ShowS)
-> (RabbitMqAdminOpts -> FilePath)
-> ([RabbitMqAdminOpts] -> ShowS)
-> Show RabbitMqAdminOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqAdminOpts -> ShowS
showsPrec :: Int -> RabbitMqAdminOpts -> ShowS
$cshow :: RabbitMqAdminOpts -> FilePath
show :: RabbitMqAdminOpts -> FilePath
$cshowList :: [RabbitMqAdminOpts] -> ShowS
showList :: [RabbitMqAdminOpts] -> ShowS
Show)
instance FromJSON RabbitMqAdminOpts where
parseJSON :: Value -> Parser RabbitMqAdminOpts
parseJSON = FilePath
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"RabbitMqAdminOpts" ((Object -> Parser RabbitMqAdminOpts)
-> Value -> Parser RabbitMqAdminOpts)
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a b. (a -> b) -> a -> b
$ \Object
v ->
FilePath
-> Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> FilePath
-> Int
-> RabbitMqAdminOpts
RabbitMqAdminOpts
(FilePath
-> Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> FilePath
-> Int
-> RabbitMqAdminOpts)
-> Parser FilePath
-> Parser
(Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> FilePath
-> Int
-> RabbitMqAdminOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
Parser
(Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> FilePath
-> Int
-> RabbitMqAdminOpts)
-> Parser Int
-> Parser
(VHost
-> Maybe RabbitMqTlsOpts -> FilePath -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
Parser
(VHost
-> Maybe RabbitMqTlsOpts -> FilePath -> Int -> RabbitMqAdminOpts)
-> Parser VHost
-> Parser
(Maybe RabbitMqTlsOpts -> FilePath -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
Parser
(Maybe RabbitMqTlsOpts -> FilePath -> Int -> RabbitMqAdminOpts)
-> Parser (Maybe RabbitMqTlsOpts)
-> Parser (FilePath -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v
Parser (FilePath -> Int -> RabbitMqAdminOpts)
-> Parser FilePath -> Parser (Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"adminHost"
Parser (Int -> RabbitMqAdminOpts)
-> Parser Int -> Parser RabbitMqAdminOpts
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"adminPort"
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> Text -> Text -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts VHost
username VHost
password = do
mTlsSettings <- (RabbitMqTlsOpts -> IO TLSSettings)
-> Maybe RabbitMqTlsOpts -> IO (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings RabbitMqAdminOpts
opts.host) RabbitMqAdminOpts
opts.tls
let (protocol, managerSettings) = case mTlsSettings of
Maybe TLSSettings
Nothing -> (Scheme
Servant.Http, ManagerSettings
HTTP.defaultManagerSettings)
Just TLSSettings
tlsSettings -> (Scheme
Servant.Https, TLSSettings -> Maybe SockSettings -> ManagerSettings
HTTP.mkManagerSettings TLSSettings
tlsSettings Maybe SockSettings
forall a. Maybe a
Nothing)
manager <- HTTP.newManager managerSettings
let basicAuthData = ByteString -> ByteString -> BasicAuthData
Servant.BasicAuthData (VHost -> ByteString
Text.encodeUtf8 VHost
username) (VHost -> ByteString
Text.encodeUtf8 VHost
password)
clientEnv = Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
manager (Scheme -> FilePath -> Int -> FilePath -> BaseUrl
Servant.BaseUrl Scheme
protocol RabbitMqAdminOpts
opts.adminHost RabbitMqAdminOpts
opts.adminPort FilePath
"")
pure . fromServant $
hoistClient
(Proxy @(ToServant AdminAPI AsApi))
(either throwM pure <=< flip runClientM clientEnv)
(toServant $ adminClient basicAuthData)
mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv RabbitMqAdminOpts
opts = IO (VHost, VHost)
readCredsFromEnv IO (VHost, VHost)
-> ((VHost, VHost) -> IO (AdminAPI (AsClientT IO)))
-> IO (AdminAPI (AsClientT IO))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (VHost -> VHost -> IO (AdminAPI (AsClientT IO)))
-> (VHost, VHost) -> IO (AdminAPI (AsClientT IO))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts)
data AmqpEndpoint = AmqpEndpoint
{ AmqpEndpoint -> FilePath
host :: !String,
AmqpEndpoint -> Int
port :: !Int,
AmqpEndpoint -> VHost
vHost :: !Text,
AmqpEndpoint -> Maybe RabbitMqTlsOpts
tls :: !(Maybe RabbitMqTlsOpts)
}
deriving (AmqpEndpoint -> AmqpEndpoint -> Bool
(AmqpEndpoint -> AmqpEndpoint -> Bool)
-> (AmqpEndpoint -> AmqpEndpoint -> Bool) -> Eq AmqpEndpoint
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AmqpEndpoint -> AmqpEndpoint -> Bool
== :: AmqpEndpoint -> AmqpEndpoint -> Bool
$c/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
Eq, Int -> AmqpEndpoint -> ShowS
[AmqpEndpoint] -> ShowS
AmqpEndpoint -> FilePath
(Int -> AmqpEndpoint -> ShowS)
-> (AmqpEndpoint -> FilePath)
-> ([AmqpEndpoint] -> ShowS)
-> Show AmqpEndpoint
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AmqpEndpoint -> ShowS
showsPrec :: Int -> AmqpEndpoint -> ShowS
$cshow :: AmqpEndpoint -> FilePath
show :: AmqpEndpoint -> FilePath
$cshowList :: [AmqpEndpoint] -> ShowS
showList :: [AmqpEndpoint] -> ShowS
Show)
instance FromJSON AmqpEndpoint where
parseJSON :: Value -> Parser AmqpEndpoint
parseJSON = FilePath
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"AmqpEndpoint" ((Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint)
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a b. (a -> b) -> a -> b
$ \Object
v ->
FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint
AmqpEndpoint
(FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser FilePath
-> Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser Int
-> Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser VHost -> Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser (Maybe RabbitMqTlsOpts) -> Parser AmqpEndpoint
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts RabbitMqAdminOpts {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
host :: RabbitMqAdminOpts -> FilePath
port :: RabbitMqAdminOpts -> Int
vHost :: RabbitMqAdminOpts -> VHost
tls :: RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
adminHost :: RabbitMqAdminOpts -> FilePath
adminPort :: RabbitMqAdminOpts -> Int
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
adminHost :: FilePath
adminPort :: Int
..} = AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
mkRabbitMqChannelMVar :: Logger -> Maybe Text -> AmqpEndpoint -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar :: Logger -> Maybe VHost -> AmqpEndpoint -> IO (MVar Channel)
mkRabbitMqChannelMVar Logger
l Maybe VHost
connName AmqpEndpoint
opts = do
chanMVar <- IO (MVar Channel)
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
connThread <-
async . openConnectionWithRetries l opts connName $
RabbitMqHooks
{ onNewChannel = \Channel
conn -> MVar Channel -> Channel -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar Channel
chanMVar Channel
conn IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
forall a. Bounded a => a
maxBound),
onChannelException = \SomeException
_ -> IO (Maybe Channel) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Channel) -> IO ()) -> IO (Maybe Channel) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Channel -> IO (Maybe Channel)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar Channel
chanMVar,
onConnectionClose = void $ tryTakeMVar chanMVar
}
waitForConnThread <- async $ withMVar chanMVar $ \Channel
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
waitEither connThread waitForConnThread >>= \case
Left () -> RabbitMqConnectionError -> IO (MVar Channel)
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO (RabbitMqConnectionError -> IO (MVar Channel))
-> RabbitMqConnectionError -> IO (MVar Channel)
forall a b. (a -> b) -> a -> b
$ FilePath -> RabbitMqConnectionError
RabbitMqConnectionFailed FilePath
"connection thread finished before getting connection"
Right () -> MVar Channel -> IO (MVar Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MVar Channel
chanMVar
data RabbitMqConnectionError = RabbitMqConnectionFailed String
deriving (Int -> RabbitMqConnectionError -> ShowS
[RabbitMqConnectionError] -> ShowS
RabbitMqConnectionError -> FilePath
(Int -> RabbitMqConnectionError -> ShowS)
-> (RabbitMqConnectionError -> FilePath)
-> ([RabbitMqConnectionError] -> ShowS)
-> Show RabbitMqConnectionError
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqConnectionError -> ShowS
showsPrec :: Int -> RabbitMqConnectionError -> ShowS
$cshow :: RabbitMqConnectionError -> FilePath
show :: RabbitMqConnectionError -> FilePath
$cshowList :: [RabbitMqConnectionError] -> ShowS
showList :: [RabbitMqConnectionError] -> ShowS
Show)
instance Exception RabbitMqConnectionError
mkConnectionOpts :: (MonadIO m) => AmqpEndpoint -> Maybe Text -> m Q.ConnectionOpts
mkConnectionOpts :: forall (m :: * -> *).
MonadIO m =>
AmqpEndpoint -> Maybe VHost -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
host :: AmqpEndpoint -> FilePath
port :: AmqpEndpoint -> Int
vHost :: AmqpEndpoint -> VHost
tls :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} Maybe VHost
name = do
mTlsSettings <- (RabbitMqTlsOpts -> m TLSSettings)
-> Maybe RabbitMqTlsOpts -> m (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (IO TLSSettings -> m TLSSettings
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TLSSettings -> m TLSSettings)
-> (RabbitMqTlsOpts -> IO TLSSettings)
-> RabbitMqTlsOpts
-> m TLSSettings
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host)) Maybe RabbitMqTlsOpts
tls
(username, password) <- liftIO $ readCredsFromEnv
pure
Q.defaultConnectionOpts
{ Q.coServers = [(host, fromIntegral port)],
Q.coVHost = vHost,
Q.coAuth = [Q.plain username password],
Q.coTLSSettings = fmap Q.TLSCustom mTlsSettings,
Q.coName = name
}
openConnectionWithRetries ::
forall m.
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger ->
AmqpEndpoint ->
Maybe Text ->
RabbitMqHooks m ->
m ()
openConnectionWithRetries :: forall (m :: * -> *).
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger -> AmqpEndpoint -> Maybe VHost -> RabbitMqHooks m -> m ()
openConnectionWithRetries Logger
l AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
host :: AmqpEndpoint -> FilePath
port :: AmqpEndpoint -> Int
vHost :: AmqpEndpoint -> VHost
tls :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} Maybe VHost
connName RabbitMqHooks m
hooks = do
(username, password) <- IO (VHost, VHost) -> m (VHost, VHost)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (VHost, VHost) -> m (VHost, VHost))
-> IO (VHost, VHost) -> m (VHost, VHost)
forall a b. (a -> b) -> a -> b
$ IO (VHost, VHost)
readCredsFromEnv
connectWithRetries username password
where
connectWithRetries :: Text -> Text -> m ()
connectWithRetries :: VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password = do
let policy :: RetryPolicyM m
policy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay Int
5_000_000 (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff Int
1000
logError :: Bool -> SomeException -> RetryStatus -> m ()
logError Bool
willRetry SomeException
e RetryStatus
retryStatus = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Failed to connect to RabbitMQ")
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (forall e. Exception e => e -> FilePath
displayException @SomeException SomeException
e)
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" Bool
willRetry
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" RetryStatus
retryStatus.rsIterNumber
getConn :: m Connection
getConn = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"About to enter recovering...")
conn <-
RetryPolicyM m
-> [RetryStatus -> Handler m Bool]
-> (RetryStatus -> m Connection)
-> m Connection
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering
RetryPolicyM m
policy
( Logger -> [RetryStatus -> Handler m Bool]
forall (m :: * -> *).
MonadIO m =>
Logger -> [RetryStatus -> Handler m Bool]
logAndSkipAsyncExceptions Logger
l
[RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> m Bool)
-> (Bool -> SomeException -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (m Bool -> SomeException -> m Bool
forall a b. a -> b -> a
const (m Bool -> SomeException -> m Bool)
-> m Bool -> SomeException -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> m ()
logError]
)
( m Connection -> RetryStatus -> m Connection
forall a b. a -> b -> a
const (m Connection -> RetryStatus -> m Connection)
-> m Connection -> RetryStatus -> m Connection
forall a b. (a -> b) -> a -> b
$ do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Trying to connect to RabbitMQ")
connOpts <- AmqpEndpoint -> Maybe VHost -> m ConnectionOpts
forall (m :: * -> *).
MonadIO m =>
AmqpEndpoint -> Maybe VHost -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} Maybe VHost
connName
liftIO $ Q.openConnection'' connOpts
)
Log.info l $ Log.msg (Log.val "Retrieved connection...")
pure conn
m Connection
-> (Connection -> m ()) -> (Connection -> m ()) -> m ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket m Connection
getConn (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Q.closeConnection) ((Connection -> m ()) -> m ()) -> (Connection -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
(RunInBase m IO -> IO ()) -> m ()
forall a. (RunInBase m IO -> IO a) -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b a) -> m a
liftBaseWith ((RunInBase m IO -> IO ()) -> m ())
-> (RunInBase m IO -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
Connection -> Bool -> IO () -> IO ()
Q.addConnectionClosedHandler Connection
conn Bool
True (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ())) -> m () -> IO (StM m ())
forall a b. (a -> b) -> a -> b
$ do
RabbitMqHooks m
hooks.onConnectionClose
m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway"
VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password
Connection -> m ()
openChan Connection
conn
openChan :: Q.Connection -> m ()
openChan :: Connection -> m ()
openChan Connection
conn = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Opening channel with RabbitMQ")
chan <- IO Channel -> m Channel
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Channel -> m Channel) -> IO Channel -> m Channel
forall a b. (a -> b) -> a -> b
$ Connection -> IO Channel
Q.openChannel Connection
conn
liftBaseWith $ \RunInBase m IO
runInIO ->
Channel -> (SomeException -> IO ()) -> IO ()
Q.addChannelExceptionHandler Channel
chan (IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ())
-> (SomeException -> IO (StM m ())) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ()))
-> (SomeException -> m ()) -> SomeException -> IO (StM m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn)
Log.info l $ Log.msg (Log.val "RabbitMQ channel opened")
hooks.onNewChannel chan
chanExceptionHandler :: Q.Connection -> SomeException -> m ()
chanExceptionHandler :: Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn SomeException
e = do
RabbitMqHooks m
hooks.onChannelException SomeException
e m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onChannelException hook threw an exception"
case (SomeException -> Bool
Q.isNormalChannelClose SomeException
e, SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e) of
(Bool
True, Maybe AMQPException
_) ->
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ channel is closed normally, not attempting to reopen channel")
(Bool
_, Just (Q.ConnectionClosedException {})) ->
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ connection is closed, not attempting to reopen channel")
(Bool, Maybe AMQPException)
_ -> do
Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"RabbitMQ channel closed" SomeException
e
Connection -> m ()
openChan Connection
conn
logAndSkipAsyncExceptions :: (MonadIO m) => Logger -> [RetryStatus -> Control.Monad.Catch.Handler m Bool]
logAndSkipAsyncExceptions :: forall (m :: * -> *).
MonadIO m =>
Logger -> [RetryStatus -> Handler m Bool]
logAndSkipAsyncExceptions Logger
l = [RetryStatus -> Handler m Bool]
handlers
where
asyncH :: RetryStatus -> Handler m Bool
asyncH RetryStatus
_ = (AsyncException -> m Bool) -> Handler m Bool
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((AsyncException -> m Bool) -> Handler m Bool)
-> (AsyncException -> m Bool) -> Handler m Bool
forall a b. (a -> b) -> a -> b
$ \(AsyncException
e :: AsyncException) -> do
Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"AsyncException caught" (AsyncException -> SomeException
forall e. (Exception e, HasExceptionContext) => e -> SomeException
SomeException AsyncException
e)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
someAsyncH :: RetryStatus -> Handler m Bool
someAsyncH RetryStatus
_ = (SomeAsyncException -> m Bool) -> Handler m Bool
forall (m :: * -> *) a e. Exception e => (e -> m a) -> Handler m a
Handler ((SomeAsyncException -> m Bool) -> Handler m Bool)
-> (SomeAsyncException -> m Bool) -> Handler m Bool
forall a b. (a -> b) -> a -> b
$ \(SomeAsyncException
e :: SomeAsyncException) -> do
Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"SomeAsyncException caught" (SomeAsyncException -> SomeException
forall e. (Exception e, HasExceptionContext) => e -> SomeException
SomeException SomeAsyncException
e)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
handlers :: [RetryStatus -> Handler m Bool]
handlers = [RetryStatus -> Handler m Bool
asyncH, RetryStatus -> Handler m Bool
someAsyncH]
mkTLSSettings :: HostName -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings :: FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host RabbitMqTlsOpts
opts = do
setCAStore <- MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared)))
-> MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall a b. (a -> b) -> a -> b
$ do
path <- MaybeT IO FilePath
-> (FilePath -> MaybeT IO FilePath)
-> Maybe FilePath
-> MaybeT IO FilePath
forall b a. b -> (a -> b) -> Maybe a -> b
maybe MaybeT IO FilePath
forall a. MaybeT IO a
forall (m :: * -> *) a. MonadPlus m => m a
mzero FilePath -> MaybeT IO FilePath
forall a. a -> MaybeT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure RabbitMqTlsOpts
opts.caCert
store <- MaybeT $ X509.readCertificateStore path
pure $ \Shared
shared -> Shared
shared {sharedCAStore = store}
let setHooks =
if RabbitMqTlsOpts
opts.insecureSkipVerifyTls
then \ClientHooks
h -> ClientHooks
h {onServerCertificate = \CertificateStore
_ ValidationCache
_ ServiceID
_ CertificateChain
_ -> [FailedReason] -> IO [FailedReason]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []}
else ClientHooks -> ClientHooks
forall a. a -> a
id
pure $
TLSSettings
(defaultParamsClient host "rabbitmq")
{ clientShared = fromMaybe id setCAStore def,
clientHooks = setHooks def,
clientSupported =
def
{ supportedVersions = [TLS13, TLS12],
supportedCiphers = ciphersuite_strong
}
}
logException :: (MonadIO m) => Logger -> String -> SomeException -> m ()
logException :: forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
m (SomeException e
e) = do
Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
FilePath -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg FilePath
m
(Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (e -> FilePath
forall e. Exception e => e -> FilePath
displayException e
e)
readCredsFromEnv :: IO (Text, Text)
readCredsFromEnv :: IO (VHost, VHost)
readCredsFromEnv =
(,)
(VHost -> VHost -> (VHost, VHost))
-> IO VHost -> IO (VHost -> (VHost, VHost))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_USERNAME")
IO (VHost -> (VHost, VHost)) -> IO VHost -> IO (VHost, VHost)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_PASSWORD")