{-# LANGUAGE RecordWildCards #-}

module Network.AMQP.Extended
  ( RabbitMqHooks (..),
    RabbitMqAdminOpts (..),
    AmqpEndpoint (..),
    withConnection,
    openConnectionWithRetries,
    mkRabbitMqAdminClientEnv,
    mkRabbitMqAdminClientEnvWithCreds,
    mkRabbitMqChannelMVar,
    demoteOpts,
    RabbitMqTlsOpts (..),
    mkConnectionOpts,
    mkTLSSettings,
    readCredsFromEnv,
  )
where

import Control.Exception (throwIO)
import Control.Monad.Catch
import Control.Monad.Trans.Control
import Control.Monad.Trans.Maybe
import Control.Retry
import Data.Aeson
import Data.Aeson.Types
import Data.Default
import Data.Proxy
import Data.Text qualified as Text
import Data.Text.Encoding qualified as Text
import Data.X509.CertificateStore qualified as X509
import Imports
import Network.AMQP qualified as Q
import Network.Connection as Conn
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Client.TLS qualified as HTTP
import Network.RabbitMqAdmin
import Network.TLS
import Network.TLS.Extra.Cipher
import Servant
import Servant.Client
import Servant.Client qualified as Servant
import System.Logger (Logger)
import System.Logger qualified as Log
import UnliftIO.Async

data RabbitMqHooks m = RabbitMqHooks
  { -- | Called whenever there is a new channel. At any time there should be at
    -- max 1 open channel. Perhaps this would need to change in future.
    forall (m :: * -> *). RabbitMqHooks m -> Channel -> m ()
onNewChannel :: Q.Channel -> m (),
    -- | Called when connection is closed. Any exceptions thrown by this would
    -- be logged and ignored.
    forall (m :: * -> *). RabbitMqHooks m -> m ()
onConnectionClose :: m (),
    -- | Called when the channel is closed. Any exceptions thrown by this would
    -- be logged and ignored.
    forall (m :: * -> *). RabbitMqHooks m -> SomeException -> m ()
onChannelException :: SomeException -> m ()
  }

data RabbitMqTlsOpts = RabbitMqTlsOpts
  { RabbitMqTlsOpts -> Maybe FilePath
caCert :: !(Maybe FilePath),
    RabbitMqTlsOpts -> Bool
insecureSkipVerifyTls :: Bool
  }
  deriving (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
(RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> (RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool)
-> Eq RabbitMqTlsOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
== :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
$c/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
/= :: RabbitMqTlsOpts -> RabbitMqTlsOpts -> Bool
Eq, Int -> RabbitMqTlsOpts -> ShowS
[RabbitMqTlsOpts] -> ShowS
RabbitMqTlsOpts -> FilePath
(Int -> RabbitMqTlsOpts -> ShowS)
-> (RabbitMqTlsOpts -> FilePath)
-> ([RabbitMqTlsOpts] -> ShowS)
-> Show RabbitMqTlsOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqTlsOpts -> ShowS
showsPrec :: Int -> RabbitMqTlsOpts -> ShowS
$cshow :: RabbitMqTlsOpts -> FilePath
show :: RabbitMqTlsOpts -> FilePath
$cshowList :: [RabbitMqTlsOpts] -> ShowS
showList :: [RabbitMqTlsOpts] -> ShowS
Show)

parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson :: Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v = do
  Bool
enabled <- Object
v Object -> Key -> Parser (Maybe Bool)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"enableTls" Parser (Maybe Bool) -> Bool -> Parser Bool
forall a. Parser (Maybe a) -> a -> Parser a
.!= Bool
False
  if Bool
enabled
    then
      RabbitMqTlsOpts -> Maybe RabbitMqTlsOpts
forall a. a -> Maybe a
Just
        (RabbitMqTlsOpts -> Maybe RabbitMqTlsOpts)
-> Parser RabbitMqTlsOpts -> Parser (Maybe RabbitMqTlsOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ( Maybe FilePath -> Bool -> RabbitMqTlsOpts
RabbitMqTlsOpts
                (Maybe FilePath -> Bool -> RabbitMqTlsOpts)
-> Parser (Maybe FilePath) -> Parser (Bool -> RabbitMqTlsOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser (Maybe FilePath)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"caCert"
                Parser (Bool -> RabbitMqTlsOpts)
-> Parser Bool -> Parser RabbitMqTlsOpts
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser (Maybe Bool)
forall a. FromJSON a => Object -> Key -> Parser (Maybe a)
.:? Key
"insecureSkipVerifyTls" Parser (Maybe Bool) -> Bool -> Parser Bool
forall a. Parser (Maybe a) -> a -> Parser a
.!= Bool
False
            )
    else Maybe RabbitMqTlsOpts -> Parser (Maybe RabbitMqTlsOpts)
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe RabbitMqTlsOpts
forall a. Maybe a
Nothing

data RabbitMqAdminOpts = RabbitMqAdminOpts
  { RabbitMqAdminOpts -> FilePath
host :: !String,
    RabbitMqAdminOpts -> Int
port :: !Int,
    RabbitMqAdminOpts -> VHost
vHost :: !Text,
    RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
tls :: Maybe RabbitMqTlsOpts,
    RabbitMqAdminOpts -> Int
adminPort :: !Int
  }
  deriving (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
(RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> (RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool)
-> Eq RabbitMqAdminOpts
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
== :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
$c/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
/= :: RabbitMqAdminOpts -> RabbitMqAdminOpts -> Bool
Eq, Int -> RabbitMqAdminOpts -> ShowS
[RabbitMqAdminOpts] -> ShowS
RabbitMqAdminOpts -> FilePath
(Int -> RabbitMqAdminOpts -> ShowS)
-> (RabbitMqAdminOpts -> FilePath)
-> ([RabbitMqAdminOpts] -> ShowS)
-> Show RabbitMqAdminOpts
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqAdminOpts -> ShowS
showsPrec :: Int -> RabbitMqAdminOpts -> ShowS
$cshow :: RabbitMqAdminOpts -> FilePath
show :: RabbitMqAdminOpts -> FilePath
$cshowList :: [RabbitMqAdminOpts] -> ShowS
showList :: [RabbitMqAdminOpts] -> ShowS
Show)

instance FromJSON RabbitMqAdminOpts where
  parseJSON :: Value -> Parser RabbitMqAdminOpts
parseJSON = FilePath
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"RabbitMqAdminOpts" ((Object -> Parser RabbitMqAdminOpts)
 -> Value -> Parser RabbitMqAdminOpts)
-> (Object -> Parser RabbitMqAdminOpts)
-> Value
-> Parser RabbitMqAdminOpts
forall a b. (a -> b) -> a -> b
$ \Object
v ->
    FilePath
-> Int
-> VHost
-> Maybe RabbitMqTlsOpts
-> Int
-> RabbitMqAdminOpts
RabbitMqAdminOpts
      (FilePath
 -> Int
 -> VHost
 -> Maybe RabbitMqTlsOpts
 -> Int
 -> RabbitMqAdminOpts)
-> Parser FilePath
-> Parser
     (Int -> VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
      Parser
  (Int -> VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser Int
-> Parser
     (VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
      Parser (VHost -> Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser VHost
-> Parser (Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
      Parser (Maybe RabbitMqTlsOpts -> Int -> RabbitMqAdminOpts)
-> Parser (Maybe RabbitMqTlsOpts)
-> Parser (Int -> RabbitMqAdminOpts)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v
      Parser (Int -> RabbitMqAdminOpts)
-> Parser Int -> Parser RabbitMqAdminOpts
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"adminPort"

mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> Text -> Text -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds :: RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts VHost
username VHost
password = do
  Maybe TLSSettings
mTlsSettings <- (RabbitMqTlsOpts -> IO TLSSettings)
-> Maybe RabbitMqTlsOpts -> IO (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings RabbitMqAdminOpts
opts.host) RabbitMqAdminOpts
opts.tls
  let (Scheme
protocol, ManagerSettings
managerSettings) = case Maybe TLSSettings
mTlsSettings of
        Maybe TLSSettings
Nothing -> (Scheme
Servant.Http, ManagerSettings
HTTP.defaultManagerSettings)
        Just TLSSettings
tlsSettings -> (Scheme
Servant.Https, TLSSettings -> Maybe SockSettings -> ManagerSettings
HTTP.mkManagerSettings TLSSettings
tlsSettings Maybe SockSettings
forall a. Maybe a
Nothing)
  Manager
manager <- ManagerSettings -> IO Manager
HTTP.newManager ManagerSettings
managerSettings
  let basicAuthData :: BasicAuthData
basicAuthData = ByteString -> ByteString -> BasicAuthData
Servant.BasicAuthData (VHost -> ByteString
Text.encodeUtf8 VHost
username) (VHost -> ByteString
Text.encodeUtf8 VHost
password)
      clientEnv :: ClientEnv
clientEnv = Manager -> BaseUrl -> ClientEnv
Servant.mkClientEnv Manager
manager (Scheme -> FilePath -> Int -> FilePath -> BaseUrl
Servant.BaseUrl Scheme
protocol RabbitMqAdminOpts
opts.host RabbitMqAdminOpts
opts.adminPort FilePath
"")
  AdminAPI (AsClientT IO) -> IO (AdminAPI (AsClientT IO))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (AdminAPI (AsClientT IO) -> IO (AdminAPI (AsClientT IO)))
-> ((((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
      :<|> (VHost -> VHost -> IO NoContent))
     :<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
    -> AdminAPI (AsClientT IO))
-> (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
     :<|> (VHost -> VHost -> IO NoContent))
    :<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> IO (AdminAPI (AsClientT IO))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
  :<|> (VHost -> VHost -> IO NoContent))
 :<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> AdminAPI (AsClientT IO)
ToServant AdminAPI (AsClientT IO) -> AdminAPI (AsClientT IO)
forall {k} (routes :: k -> *) (mode :: k).
GenericServant routes mode =>
ToServant routes mode -> routes mode
fromServant ((((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
   :<|> (VHost -> VHost -> IO NoContent))
  :<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
 -> IO (AdminAPI (AsClientT IO)))
-> (((VHost -> VHost -> Bool -> Int -> Int -> IO (Page Queue))
     :<|> (VHost -> VHost -> IO NoContent))
    :<|> ((VHost -> IO [Connection]) :<|> (VHost -> IO NoContent)))
-> IO (AdminAPI (AsClientT IO))
forall a b. (a -> b) -> a -> b
$
    Proxy
  ((("api"
     :> ("queues"
         :> (Capture "vhost" VHost
             :> (QueryParam' '[Required, Strict] "name" VHost
                 :> (QueryParam' '[Required, Strict] "use_regex" Bool
                     :> (QueryParam' '[Required, Strict] "page_size" Int
                         :> (QueryParam' '[Required, Strict] "page" Int
                             :> Get '[JSON] (Page Queue))))))))
    :<|> ("api"
          :> ("queues"
              :> (Capture "vhost" VHost
                  :> (Capture "queue" VHost :> DeleteNoContent)))))
   :<|> (("api"
          :> ("vhosts"
              :> (Capture "vhost" VHost
                  :> ("connections" :> Get '[JSON] [Connection]))))
         :<|> ("api"
               :> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
-> (forall a. ClientM a -> IO a)
-> Client
     ClientM
     ((("api"
        :> ("queues"
            :> (Capture "vhost" VHost
                :> (QueryParam' '[Required, Strict] "name" VHost
                    :> (QueryParam' '[Required, Strict] "use_regex" Bool
                        :> (QueryParam' '[Required, Strict] "page_size" Int
                            :> (QueryParam' '[Required, Strict] "page" Int
                                :> Get '[JSON] (Page Queue))))))))
       :<|> ("api"
             :> ("queues"
                 :> (Capture "vhost" VHost
                     :> (Capture "queue" VHost :> DeleteNoContent)))))
      :<|> (("api"
             :> ("vhosts"
                 :> (Capture "vhost" VHost
                     :> ("connections" :> Get '[JSON] [Connection]))))
            :<|> ("api"
                  :> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
-> Client
     IO
     ((("api"
        :> ("queues"
            :> (Capture "vhost" VHost
                :> (QueryParam' '[Required, Strict] "name" VHost
                    :> (QueryParam' '[Required, Strict] "use_regex" Bool
                        :> (QueryParam' '[Required, Strict] "page_size" Int
                            :> (QueryParam' '[Required, Strict] "page" Int
                                :> Get '[JSON] (Page Queue))))))))
       :<|> ("api"
             :> ("queues"
                 :> (Capture "vhost" VHost
                     :> (Capture "queue" VHost :> DeleteNoContent)))))
      :<|> (("api"
             :> ("vhosts"
                 :> (Capture "vhost" VHost
                     :> ("connections" :> Get '[JSON] [Connection]))))
            :<|> ("api"
                  :> ("connections" :> (Capture "name" VHost :> DeleteNoContent)))))
forall api (m :: * -> *) (n :: * -> *).
HasClient ClientM api =>
Proxy api -> (forall a. m a -> n a) -> Client m api -> Client n api
hoistClient
      (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ToServant AdminAPI AsApi))
      ((ClientError -> IO a)
-> (a -> IO a) -> Either ClientError a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ClientError -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM a -> IO a
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either ClientError a -> IO a)
-> (ClientM a -> IO (Either ClientError a)) -> ClientM a -> IO a
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< (ClientM a -> ClientEnv -> IO (Either ClientError a))
-> ClientEnv -> ClientM a -> IO (Either ClientError a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ClientM a -> ClientEnv -> IO (Either ClientError a)
forall a. ClientM a -> ClientEnv -> IO (Either ClientError a)
runClientM ClientEnv
clientEnv)
      (AdminAPI (AsClientT ClientM)
-> ToServant AdminAPI (AsClientT ClientM)
forall {k} (routes :: k -> *) (mode :: k).
GenericServant routes mode =>
routes mode -> ToServant routes mode
toServant (AdminAPI (AsClientT ClientM)
 -> ToServant AdminAPI (AsClientT ClientM))
-> AdminAPI (AsClientT ClientM)
-> ToServant AdminAPI (AsClientT ClientM)
forall a b. (a -> b) -> a -> b
$ BasicAuthData -> AdminAPI (AsClientT ClientM)
adminClient BasicAuthData
basicAuthData)

mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv :: RabbitMqAdminOpts -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnv RabbitMqAdminOpts
opts = IO (VHost, VHost)
readCredsFromEnv IO (VHost, VHost)
-> ((VHost, VHost) -> IO (AdminAPI (AsClientT IO)))
-> IO (AdminAPI (AsClientT IO))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (VHost -> VHost -> IO (AdminAPI (AsClientT IO)))
-> (VHost, VHost) -> IO (AdminAPI (AsClientT IO))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (RabbitMqAdminOpts -> VHost -> VHost -> IO (AdminAPI (AsClientT IO))
mkRabbitMqAdminClientEnvWithCreds RabbitMqAdminOpts
opts)

-- | When admin opts are needed use `AmqpEndpoint Identity`, otherwise use
-- `AmqpEndpoint NoAdmin`.
data AmqpEndpoint = AmqpEndpoint
  { AmqpEndpoint -> FilePath
host :: !String,
    AmqpEndpoint -> Int
port :: !Int,
    AmqpEndpoint -> VHost
vHost :: !Text,
    AmqpEndpoint -> Maybe RabbitMqTlsOpts
tls :: !(Maybe RabbitMqTlsOpts)
  }
  deriving (AmqpEndpoint -> AmqpEndpoint -> Bool
(AmqpEndpoint -> AmqpEndpoint -> Bool)
-> (AmqpEndpoint -> AmqpEndpoint -> Bool) -> Eq AmqpEndpoint
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: AmqpEndpoint -> AmqpEndpoint -> Bool
== :: AmqpEndpoint -> AmqpEndpoint -> Bool
$c/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
/= :: AmqpEndpoint -> AmqpEndpoint -> Bool
Eq, Int -> AmqpEndpoint -> ShowS
[AmqpEndpoint] -> ShowS
AmqpEndpoint -> FilePath
(Int -> AmqpEndpoint -> ShowS)
-> (AmqpEndpoint -> FilePath)
-> ([AmqpEndpoint] -> ShowS)
-> Show AmqpEndpoint
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> AmqpEndpoint -> ShowS
showsPrec :: Int -> AmqpEndpoint -> ShowS
$cshow :: AmqpEndpoint -> FilePath
show :: AmqpEndpoint -> FilePath
$cshowList :: [AmqpEndpoint] -> ShowS
showList :: [AmqpEndpoint] -> ShowS
Show)

instance FromJSON AmqpEndpoint where
  parseJSON :: Value -> Parser AmqpEndpoint
parseJSON = FilePath
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a. FilePath -> (Object -> Parser a) -> Value -> Parser a
withObject FilePath
"AmqpEndpoint" ((Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint)
-> (Object -> Parser AmqpEndpoint) -> Value -> Parser AmqpEndpoint
forall a b. (a -> b) -> a -> b
$ \Object
v ->
    FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint
AmqpEndpoint
      (FilePath -> Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser FilePath
-> Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
v Object -> Key -> Parser FilePath
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"host"
      Parser (Int -> VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser Int
-> Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"port"
      Parser (VHost -> Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser VHost -> Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
v Object -> Key -> Parser VHost
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"vHost"
      Parser (Maybe RabbitMqTlsOpts -> AmqpEndpoint)
-> Parser (Maybe RabbitMqTlsOpts) -> Parser AmqpEndpoint
forall a b. Parser (a -> b) -> Parser a -> Parser b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object -> Parser (Maybe RabbitMqTlsOpts)
parseTlsJson Object
v

demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts :: RabbitMqAdminOpts -> AmqpEndpoint
demoteOpts RabbitMqAdminOpts {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:RabbitMqAdminOpts :: RabbitMqAdminOpts -> FilePath
$sel:port:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Int
$sel:vHost:RabbitMqAdminOpts :: RabbitMqAdminOpts -> VHost
$sel:tls:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Maybe RabbitMqTlsOpts
$sel:adminPort:RabbitMqAdminOpts :: RabbitMqAdminOpts -> Int
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
adminPort :: Int
..} = AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}

-- | Useful if the application only pushes into some queues.
mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Q.Channel)
mkRabbitMqChannelMVar :: Logger -> AmqpEndpoint -> IO (MVar Channel)
mkRabbitMqChannelMVar Logger
l AmqpEndpoint
opts = do
  MVar Channel
chanMVar <- IO (MVar Channel)
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  Async ()
connThread <-
    IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ()))
-> (RabbitMqHooks IO -> IO ()) -> RabbitMqHooks IO -> IO (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Logger -> AmqpEndpoint -> RabbitMqHooks IO -> IO ()
forall (m :: * -> *).
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger -> AmqpEndpoint -> RabbitMqHooks m -> m ()
openConnectionWithRetries Logger
l AmqpEndpoint
opts (RabbitMqHooks IO -> IO (Async ()))
-> RabbitMqHooks IO -> IO (Async ())
forall a b. (a -> b) -> a -> b
$
      RabbitMqHooks
        { $sel:onNewChannel:RabbitMqHooks :: Channel -> IO ()
onNewChannel = \Channel
conn -> MVar Channel -> Channel -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar Channel
chanMVar Channel
conn IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
forall a. Bounded a => a
maxBound),
          $sel:onChannelException:RabbitMqHooks :: SomeException -> IO ()
onChannelException = \SomeException
_ -> IO (Maybe Channel) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Channel) -> IO ()) -> IO (Maybe Channel) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Channel -> IO (Maybe Channel)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar Channel
chanMVar,
          $sel:onConnectionClose:RabbitMqHooks :: IO ()
onConnectionClose = IO (Maybe Channel) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe Channel) -> IO ()) -> IO (Maybe Channel) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Channel -> IO (Maybe Channel)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar Channel
chanMVar
        }
  Async ()
waitForConnThread <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ MVar Channel -> (Channel -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
MVar a -> (a -> m b) -> m b
withMVar MVar Channel
chanMVar ((Channel -> IO ()) -> IO ()) -> (Channel -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Channel
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  Async () -> Async () -> IO (Either () ())
forall (m :: * -> *) a b.
MonadIO m =>
Async a -> Async b -> m (Either a b)
waitEither Async ()
connThread Async ()
waitForConnThread IO (Either () ())
-> (Either () () -> IO (MVar Channel)) -> IO (MVar Channel)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left () -> RabbitMqConnectionError -> IO (MVar Channel)
forall e a. Exception e => e -> IO a
throwIO (RabbitMqConnectionError -> IO (MVar Channel))
-> RabbitMqConnectionError -> IO (MVar Channel)
forall a b. (a -> b) -> a -> b
$ FilePath -> RabbitMqConnectionError
RabbitMqConnectionFailed FilePath
"connection thread finished before getting connection"
    Right () -> MVar Channel -> IO (MVar Channel)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure MVar Channel
chanMVar

data RabbitMqConnectionError = RabbitMqConnectionFailed String
  deriving (Int -> RabbitMqConnectionError -> ShowS
[RabbitMqConnectionError] -> ShowS
RabbitMqConnectionError -> FilePath
(Int -> RabbitMqConnectionError -> ShowS)
-> (RabbitMqConnectionError -> FilePath)
-> ([RabbitMqConnectionError] -> ShowS)
-> Show RabbitMqConnectionError
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RabbitMqConnectionError -> ShowS
showsPrec :: Int -> RabbitMqConnectionError -> ShowS
$cshow :: RabbitMqConnectionError -> FilePath
show :: RabbitMqConnectionError -> FilePath
$cshowList :: [RabbitMqConnectionError] -> ShowS
showList :: [RabbitMqConnectionError] -> ShowS
Show)

instance Exception RabbitMqConnectionError

-- | Connects with RabbitMQ and opens a channel.
withConnection ::
  forall m a.
  (MonadIO m, MonadMask m) =>
  Logger ->
  AmqpEndpoint ->
  (Q.Connection -> m a) ->
  m a
withConnection :: forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
Logger -> AmqpEndpoint -> (Connection -> m a) -> m a
withConnection Logger
l AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} Connection -> m a
k = do
  -- Jittered exponential backoff with 1ms as starting delay and 1s as total
  -- wait time.
  let policy :: RetryPolicyM m
policy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
limitRetriesByCumulativeDelay Int
1_000_000 (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff Int
1000
      logError :: Bool -> SomeException -> RetryStatus -> m ()
logError Bool
willRetry SomeException
e RetryStatus
retryStatus = do
        Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
          Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Failed to connect to RabbitMQ")
            (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (forall e. Exception e => e -> FilePath
displayException @SomeException SomeException
e)
            (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" Bool
willRetry
            (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" RetryStatus
retryStatus.rsIterNumber
      getConn :: m Connection
getConn =
        RetryPolicyM m
-> [RetryStatus -> Handler m Bool]
-> (RetryStatus -> m Connection)
-> m Connection
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering
          RetryPolicyM m
policy
          ( [RetryStatus -> Handler m Bool]
forall (m :: * -> *). MonadIO m => [RetryStatus -> Handler m Bool]
skipAsyncExceptions
              [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> m Bool)
-> (Bool -> SomeException -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (m Bool -> SomeException -> m Bool
forall a b. a -> b -> a
const (m Bool -> SomeException -> m Bool)
-> m Bool -> SomeException -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> m ()
logError]
          )
          ( m Connection -> RetryStatus -> m Connection
forall a b. a -> b -> a
const (m Connection -> RetryStatus -> m Connection)
-> m Connection -> RetryStatus -> m Connection
forall a b. (a -> b) -> a -> b
$ do
              Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Trying to connect to RabbitMQ")
              ConnectionOpts
connOpts <- AmqpEndpoint -> m ConnectionOpts
forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
              IO Connection -> m Connection
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> IO Connection
Q.openConnection'' ConnectionOpts
connOpts
          )
  m Connection -> (Connection -> m ()) -> (Connection -> m a) -> m a
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket m Connection
getConn (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Q.closeConnection) Connection -> m a
k

mkConnectionOpts :: (MonadIO m) => AmqpEndpoint -> m Q.ConnectionOpts
mkConnectionOpts :: forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} = do
  Maybe TLSSettings
mTlsSettings <- (RabbitMqTlsOpts -> m TLSSettings)
-> Maybe RabbitMqTlsOpts -> m (Maybe TLSSettings)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (IO TLSSettings -> m TLSSettings
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO TLSSettings -> m TLSSettings)
-> (RabbitMqTlsOpts -> IO TLSSettings)
-> RabbitMqTlsOpts
-> m TLSSettings
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host)) Maybe RabbitMqTlsOpts
tls
  (VHost
username, VHost
password) <- IO (VHost, VHost) -> m (VHost, VHost)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (VHost, VHost) -> m (VHost, VHost))
-> IO (VHost, VHost) -> m (VHost, VHost)
forall a b. (a -> b) -> a -> b
$ IO (VHost, VHost)
readCredsFromEnv
  ConnectionOpts -> m ConnectionOpts
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ConnectionOpts
Q.defaultConnectionOpts
      { Q.coServers = [(host, fromIntegral port)],
        Q.coVHost = vHost,
        Q.coAuth = [Q.plain username password],
        Q.coTLSSettings = fmap Q.TLSCustom mTlsSettings
      }

-- | Connects with RabbitMQ and opens a channel. If the channel is closed for
-- some reasons, reopens the channel. If the connection is closed for some
-- reasons, keeps retrying to connect until it works.
openConnectionWithRetries ::
  forall m.
  (MonadIO m, MonadMask m, MonadBaseControl IO m) =>
  Logger ->
  AmqpEndpoint ->
  RabbitMqHooks m ->
  m ()
openConnectionWithRetries :: forall (m :: * -> *).
(MonadIO m, MonadMask m, MonadBaseControl IO m) =>
Logger -> AmqpEndpoint -> RabbitMqHooks m -> m ()
openConnectionWithRetries Logger
l AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: AmqpEndpoint -> FilePath
$sel:port:AmqpEndpoint :: AmqpEndpoint -> Int
$sel:vHost:AmqpEndpoint :: AmqpEndpoint -> VHost
$sel:tls:AmqpEndpoint :: AmqpEndpoint -> Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..} RabbitMqHooks m
hooks = do
  (VHost
username, VHost
password) <- IO (VHost, VHost) -> m (VHost, VHost)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (VHost, VHost) -> m (VHost, VHost))
-> IO (VHost, VHost) -> m (VHost, VHost)
forall a b. (a -> b) -> a -> b
$ IO (VHost, VHost)
readCredsFromEnv
  VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password
  where
    connectWithRetries :: Text -> Text -> m ()
    connectWithRetries :: VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password = do
      -- Jittered exponential backoff with 1ms as starting delay and 5s as max
      -- delay.
      let policy :: RetryPolicyM m
policy = Int -> RetryPolicyM m -> RetryPolicyM m
forall (m :: * -> *).
Monad m =>
Int -> RetryPolicyM m -> RetryPolicyM m
capDelay Int
5_000_000 (RetryPolicyM m -> RetryPolicyM m)
-> RetryPolicyM m -> RetryPolicyM m
forall a b. (a -> b) -> a -> b
$ Int -> RetryPolicyM m
forall (m :: * -> *). MonadIO m => Int -> RetryPolicyM m
fullJitterBackoff Int
1000
          logError :: Bool -> SomeException -> RetryStatus -> m ()
logError Bool
willRetry SomeException
e RetryStatus
retryStatus = do
            Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
              Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Failed to connect to RabbitMQ")
                (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (forall e. Exception e => e -> FilePath
displayException @SomeException SomeException
e)
                (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" Bool
willRetry
                (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" RetryStatus
retryStatus.rsIterNumber
          getConn :: m Connection
getConn =
            RetryPolicyM m
-> [RetryStatus -> Handler m Bool]
-> (RetryStatus -> m Connection)
-> m Connection
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering
              RetryPolicyM m
policy
              ( [RetryStatus -> Handler m Bool]
forall (m :: * -> *). MonadIO m => [RetryStatus -> Handler m Bool]
skipAsyncExceptions
                  [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
-> [RetryStatus -> Handler m Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> m Bool)
-> (Bool -> SomeException -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (m Bool -> SomeException -> m Bool
forall a b. a -> b -> a
const (m Bool -> SomeException -> m Bool)
-> m Bool -> SomeException -> m Bool
forall a b. (a -> b) -> a -> b
$ Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> m ()
logError]
              )
              ( m Connection -> RetryStatus -> m Connection
forall a b. a -> b -> a
const (m Connection -> RetryStatus -> m Connection)
-> m Connection -> RetryStatus -> m Connection
forall a b. (a -> b) -> a -> b
$ do
                  Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Trying to connect to RabbitMQ")
                  ConnectionOpts
connOpts <- AmqpEndpoint -> m ConnectionOpts
forall (m :: * -> *). MonadIO m => AmqpEndpoint -> m ConnectionOpts
mkConnectionOpts AmqpEndpoint {Int
FilePath
Maybe RabbitMqTlsOpts
VHost
$sel:host:AmqpEndpoint :: FilePath
$sel:port:AmqpEndpoint :: Int
$sel:vHost:AmqpEndpoint :: VHost
$sel:tls:AmqpEndpoint :: Maybe RabbitMqTlsOpts
host :: FilePath
port :: Int
vHost :: VHost
tls :: Maybe RabbitMqTlsOpts
..}
                  IO Connection -> m Connection
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> m Connection) -> IO Connection -> m Connection
forall a b. (a -> b) -> a -> b
$ ConnectionOpts -> IO Connection
Q.openConnection'' ConnectionOpts
connOpts
              )
      m Connection
-> (Connection -> m ()) -> (Connection -> m ()) -> m ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket m Connection
getConn (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Connection -> IO ()) -> Connection -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Q.closeConnection) ((Connection -> m ()) -> m ()) -> (Connection -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
        (RunInBase m IO -> IO ()) -> m ()
forall a. (RunInBase m IO -> IO a) -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b a) -> m a
liftBaseWith ((RunInBase m IO -> IO ()) -> m ())
-> (RunInBase m IO -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
          Connection -> Bool -> IO () -> IO ()
Q.addConnectionClosedHandler Connection
conn Bool
True (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ())) -> m () -> IO (StM m ())
forall a b. (a -> b) -> a -> b
$ do
            RabbitMqHooks m
hooks.onConnectionClose
              m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onConnectionClose hook threw an exception, reconnecting to RabbitMQ anyway"
            VHost -> VHost -> m ()
connectWithRetries VHost
username VHost
password
        Connection -> m ()
openChan Connection
conn

    openChan :: Q.Connection -> m ()
    openChan :: Connection -> m ()
openChan Connection
conn = do
      Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Opening channel with RabbitMQ")
      Channel
chan <- IO Channel -> m Channel
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Channel -> m Channel) -> IO Channel -> m Channel
forall a b. (a -> b) -> a -> b
$ Connection -> IO Channel
Q.openChannel Connection
conn
      (RunInBase m IO -> IO ()) -> m ()
forall a. (RunInBase m IO -> IO a) -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b a) -> m a
liftBaseWith ((RunInBase m IO -> IO ()) -> m ())
-> (RunInBase m IO -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
runInIO ->
        Channel -> (SomeException -> IO ()) -> IO ()
Q.addChannelExceptionHandler Channel
chan (IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ())
-> (SomeException -> IO (StM m ())) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> IO (StM m ())
RunInBase m IO
runInIO (m () -> IO (StM m ()))
-> (SomeException -> m ()) -> SomeException -> IO (StM m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn)
      Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$ Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ channel opened")
      RabbitMqHooks m
hooks.onNewChannel Channel
chan

    chanExceptionHandler :: Q.Connection -> SomeException -> m ()
    chanExceptionHandler :: Connection -> SomeException -> m ()
chanExceptionHandler Connection
conn SomeException
e = do
      RabbitMqHooks m
hooks.onChannelException SomeException
e m () -> (SomeException -> m ()) -> m ()
forall e a. (HasCallStack, Exception e) => m a -> (e -> m a) -> m a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
`catch` Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"onChannelException hook threw an exception"
      case (SomeException -> Bool
Q.isNormalChannelClose SomeException
e, SomeException -> Maybe AMQPException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e) of
        (Bool
True, Maybe AMQPException
_) ->
          Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
            Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ channel is closed normally, not attempting to reopen channel")
        (Bool
_, Just (Q.ConnectionClosedException {})) ->
          Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.info Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
            Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"RabbitMQ connection is closed, not attempting to reopen channel")
        (Bool, Maybe AMQPException)
_ -> do
          Logger -> FilePath -> SomeException -> m ()
forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
"RabbitMQ channel closed" SomeException
e
          Connection -> m ()
openChan Connection
conn

mkTLSSettings :: HostName -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings :: FilePath -> RabbitMqTlsOpts -> IO TLSSettings
mkTLSSettings FilePath
host RabbitMqTlsOpts
opts = do
  Maybe (Shared -> Shared)
setCAStore <- MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared)))
-> MaybeT IO (Shared -> Shared) -> IO (Maybe (Shared -> Shared))
forall a b. (a -> b) -> a -> b
$ do
    FilePath
path <- MaybeT IO FilePath
-> (FilePath -> MaybeT IO FilePath)
-> Maybe FilePath
-> MaybeT IO FilePath
forall b a. b -> (a -> b) -> Maybe a -> b
maybe MaybeT IO FilePath
forall a. MaybeT IO a
forall (m :: * -> *) a. MonadPlus m => m a
mzero FilePath -> MaybeT IO FilePath
forall a. a -> MaybeT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure RabbitMqTlsOpts
opts.caCert
    CertificateStore
store <- IO (Maybe CertificateStore) -> MaybeT IO CertificateStore
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO (Maybe CertificateStore) -> MaybeT IO CertificateStore)
-> IO (Maybe CertificateStore) -> MaybeT IO CertificateStore
forall a b. (a -> b) -> a -> b
$ FilePath -> IO (Maybe CertificateStore)
X509.readCertificateStore FilePath
path
    (Shared -> Shared) -> MaybeT IO (Shared -> Shared)
forall a. a -> MaybeT IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((Shared -> Shared) -> MaybeT IO (Shared -> Shared))
-> (Shared -> Shared) -> MaybeT IO (Shared -> Shared)
forall a b. (a -> b) -> a -> b
$ \Shared
shared -> Shared
shared {sharedCAStore = store}
  let setHooks :: ClientHooks -> ClientHooks
setHooks =
        if RabbitMqTlsOpts
opts.insecureSkipVerifyTls
          then \ClientHooks
h -> ClientHooks
h {onServerCertificate = \CertificateStore
_ ValidationCache
_ ServiceID
_ CertificateChain
_ -> [FailedReason] -> IO [FailedReason]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []}
          else ClientHooks -> ClientHooks
forall a. a -> a
id
  TLSSettings -> IO TLSSettings
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TLSSettings -> IO TLSSettings) -> TLSSettings -> IO TLSSettings
forall a b. (a -> b) -> a -> b
$
    ClientParams -> TLSSettings
TLSSettings
      (FilePath -> ByteString -> ClientParams
defaultParamsClient FilePath
host ByteString
"rabbitmq")
        { clientShared = fromMaybe id setCAStore def,
          clientHooks = setHooks def,
          clientSupported =
            def
              { supportedVersions = [TLS13, TLS12],
                supportedCiphers = ciphersuite_strong
              }
        }

logException :: (MonadIO m) => Logger -> String -> SomeException -> m ()
logException :: forall (m :: * -> *).
MonadIO m =>
Logger -> FilePath -> SomeException -> m ()
logException Logger
l FilePath
m (SomeException e
e) = do
  Logger -> (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadIO m => Logger -> (Msg -> Msg) -> m ()
Log.err Logger
l ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
    FilePath -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg FilePath
m
      (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> FilePath -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (e -> FilePath
forall e. Exception e => e -> FilePath
displayException e
e)

readCredsFromEnv :: IO (Text, Text)
readCredsFromEnv :: IO (VHost, VHost)
readCredsFromEnv =
  (,)
    (VHost -> VHost -> (VHost, VHost))
-> IO VHost -> IO (VHost -> (VHost, VHost))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_USERNAME")
    IO (VHost -> (VHost, VHost)) -> IO VHost -> IO (VHost, VHost)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (FilePath -> VHost
Text.pack (FilePath -> VHost) -> IO FilePath -> IO VHost
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> FilePath -> IO FilePath
forall (m :: * -> *). MonadIO m => FilePath -> m FilePath
getEnv FilePath
"RABBITMQ_PASSWORD")