{-# LANGUAGE NumericUnderscores #-}

module Galley.Intra.BackendNotificationQueue (interpretBackendNotificationQueueAccess) where

import Control.Lens (view)
import Control.Monad.Catch
import Control.Monad.Trans.Except
import Control.Retry
import Data.Domain
import Data.Qualified
import Galley.Cassandra.Util
import Galley.Effects.BackendNotificationQueueAccess (BackendNotificationQueueAccess (..))
import Galley.Env
import Galley.Monad
import Galley.Options
import Imports
import Network.AMQP qualified as Q
import Polysemy
import Polysemy.Input
import Polysemy.TinyLog
import System.Logger.Class qualified as Log
import UnliftIO
import Wire.API.Federation.BackendNotifications
import Wire.API.Federation.Error

interpretBackendNotificationQueueAccess ::
  ( Member (Embed IO) r,
    Member (Input Env) r,
    Member TinyLog r
  ) =>
  Sem (BackendNotificationQueueAccess ': r) a ->
  Sem r a
interpretBackendNotificationQueueAccess :: forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input Env) r, Member TinyLog r) =>
Sem (BackendNotificationQueueAccess : r) a -> Sem r a
interpretBackendNotificationQueueAccess = (forall (rInitial :: EffectRow) x.
 BackendNotificationQueueAccess (Sem rInitial) x -> Sem r x)
-> Sem (BackendNotificationQueueAccess : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret ((forall (rInitial :: EffectRow) x.
  BackendNotificationQueueAccess (Sem rInitial) x -> Sem r x)
 -> Sem (BackendNotificationQueueAccess : r) a -> Sem r a)
-> (forall (rInitial :: EffectRow) x.
    BackendNotificationQueueAccess (Sem rInitial) x -> Sem r x)
-> Sem (BackendNotificationQueueAccess : r) a
-> Sem r a
forall a b. (a -> b) -> a -> b
$ \case
  EnqueueNotification DeliveryMode
deliveryMode Remote x
remote FedQueueClient c a1
action -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"BackendNotificationQueueAccess.EnqueueNotification"
    App x -> Sem r x
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input Env) r) =>
App a -> Sem r a
embedApp (App x -> Sem r x)
-> (ExceptT FederationError App a1 -> App x)
-> ExceptT FederationError App a1
-> Sem r x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT FederationError App a1 -> App x
ExceptT FederationError App a1 -> App (Either FederationError a1)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FederationError App a1 -> Sem r x)
-> ExceptT FederationError App a1 -> Sem r x
forall a b. (a -> b) -> a -> b
$ DeliveryMode
-> Domain -> FedQueueClient c a1 -> ExceptT FederationError App a1
forall {k} (c :: k) a.
DeliveryMode
-> Domain -> FedQueueClient c a -> ExceptT FederationError App a
enqueueNotification DeliveryMode
deliveryMode (Remote x -> Domain
forall (t :: QTag) a. QualifiedWithTag t a -> Domain
tDomain Remote x
remote) FedQueueClient c a1
action
  EnqueueNotificationsConcurrently DeliveryMode
m f (Remote x)
xs Remote [x] -> FedQueueClient c a1
rpc -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"BackendNotificationQueueAccess.EnqueueNotificationsConcurrently"
    App x -> Sem r x
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input Env) r) =>
App a -> Sem r a
embedApp (App x -> Sem r x)
-> (ExceptT FederationError App [Remote a1] -> App x)
-> ExceptT FederationError App [Remote a1]
-> Sem r x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT FederationError App [Remote a1] -> App x
ExceptT FederationError App [Remote a1]
-> App (Either FederationError [Remote a1])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FederationError App [Remote a1] -> Sem r x)
-> ExceptT FederationError App [Remote a1] -> Sem r x
forall a b. (a -> b) -> a -> b
$ DeliveryMode
-> f (Remote x)
-> (Remote [x] -> FedQueueClient c a1)
-> ExceptT FederationError App [Remote a1]
forall {k} (f :: * -> *) x (c :: k) a.
(Foldable f, Functor f) =>
DeliveryMode
-> f (Remote x)
-> (Remote [x] -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrently DeliveryMode
m f (Remote x)
xs Remote [x] -> FedQueueClient c a1
rpc
  EnqueueNotificationsConcurrentlyBuckets DeliveryMode
m f (Remote x)
xs Remote x -> FedQueueClient c a1
rpc -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"BackendNotificationQueueAccess.EnqueueNotificationsConcurrentlyBuckets"
    App x -> Sem r x
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input Env) r) =>
App a -> Sem r a
embedApp (App x -> Sem r x)
-> (ExceptT FederationError App [Remote a1] -> App x)
-> ExceptT FederationError App [Remote a1]
-> Sem r x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT FederationError App [Remote a1] -> App x
ExceptT FederationError App [Remote a1]
-> App (Either FederationError [Remote a1])
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT FederationError App [Remote a1] -> Sem r x)
-> ExceptT FederationError App [Remote a1] -> Sem r x
forall a b. (a -> b) -> a -> b
$ DeliveryMode
-> f (Remote x)
-> (Remote x -> FedQueueClient c a1)
-> ExceptT FederationError App [Remote a1]
forall {k} (f :: * -> *) x (c :: k) a.
Foldable f =>
DeliveryMode
-> f (Remote x)
-> (Remote x -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrentlyBuckets DeliveryMode
m f (Remote x)
xs Remote x -> FedQueueClient c a1
rpc

getChannel :: ExceptT FederationError App (MVar Q.Channel)
getChannel :: ExceptT FederationError App (MVar Channel)
getChannel = Getting (Maybe (MVar Channel)) Env (Maybe (MVar Channel))
-> ExceptT FederationError App (Maybe (MVar Channel))
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting (Maybe (MVar Channel)) Env (Maybe (MVar Channel))
Lens' Env (Maybe (MVar Channel))
rabbitmqChannel ExceptT FederationError App (Maybe (MVar Channel))
-> (Maybe (MVar Channel)
    -> ExceptT FederationError App (MVar Channel))
-> ExceptT FederationError App (MVar Channel)
forall a b.
ExceptT FederationError App a
-> (a -> ExceptT FederationError App b)
-> ExceptT FederationError App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ExceptT FederationError App (MVar Channel)
-> (MVar Channel -> ExceptT FederationError App (MVar Channel))
-> Maybe (MVar Channel)
-> ExceptT FederationError App (MVar Channel)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (FederationError -> ExceptT FederationError App (MVar Channel)
forall (m :: * -> *) e a. Monad m => e -> ExceptT e m a
throwE FederationError
FederationNotConfigured) MVar Channel -> ExceptT FederationError App (MVar Channel)
forall a. a -> ExceptT FederationError App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure

enqueueSingleNotification :: Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c a -> App a
enqueueSingleNotification :: forall {k} (c :: k) a.
Domain
-> DeliveryMode -> MVar Channel -> FedQueueClient c a -> App a
enqueueSingleNotification Domain
remoteDomain DeliveryMode
deliveryMode MVar Channel
chanVar FedQueueClient c a
action = do
  Domain
ownDomain <- Getting Domain Env Domain -> App Domain
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view ((Opts -> Const Domain Opts) -> Env -> Const Domain Env
Lens' Env Opts
options ((Opts -> Const Domain Opts) -> Env -> Const Domain Env)
-> ((Domain -> Const Domain Domain) -> Opts -> Const Domain Opts)
-> Getting Domain Env Domain
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Settings -> Const Domain Settings) -> Opts -> Const Domain Opts
Lens' Opts Settings
settings ((Settings -> Const Domain Settings) -> Opts -> Const Domain Opts)
-> ((Domain -> Const Domain Domain)
    -> Settings -> Const Domain Settings)
-> (Domain -> Const Domain Domain)
-> Opts
-> Const Domain Opts
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Domain -> Const Domain Domain)
-> Settings -> Const Domain Settings
Lens' Settings Domain
federationDomain)
  let policy :: RetryPolicyM App
policy = Int -> RetryPolicy
limitRetries Int
3 RetryPolicyM App -> RetryPolicyM App -> RetryPolicyM App
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicyM App
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
constantDelay Int
1_000_000
      handlers :: [RetryStatus -> Handler App Bool]
handlers =
        [RetryStatus -> Handler App Bool]
forall (m :: * -> *). MonadIO m => [RetryStatus -> Handler m Bool]
skipAsyncExceptions
          [RetryStatus -> Handler App Bool]
-> [RetryStatus -> Handler App Bool]
-> [RetryStatus -> Handler App Bool]
forall a. Semigroup a => a -> a -> a
<> [(SomeException -> App Bool)
-> (Bool -> SomeException -> RetryStatus -> App ())
-> RetryStatus
-> Handler App Bool
forall (m :: * -> *) e.
(Monad m, Exception e) =>
(e -> m Bool)
-> (Bool -> e -> RetryStatus -> m ())
-> RetryStatus
-> Handler m Bool
logRetries (App Bool -> SomeException -> App Bool
forall a b. a -> b -> a
const (App Bool -> SomeException -> App Bool)
-> App Bool -> SomeException -> App Bool
forall a b. (a -> b) -> a -> b
$ Bool -> App Bool
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) Bool -> SomeException -> RetryStatus -> App ()
forall {m :: * -> *} {p} {a} {p}.
(MonadReader Env m, MonadLogger m, ToBytes p, ToBytes a,
 HasField "rsIterNumber" p a) =>
p -> SomeException -> p -> m ()
logError]
  RetryPolicyM App
-> [RetryStatus -> Handler App Bool]
-> (RetryStatus -> App a)
-> App a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m
-> [RetryStatus -> Handler m Bool] -> (RetryStatus -> m a) -> m a
recovering RetryPolicyM App
policy [RetryStatus -> Handler App Bool]
handlers (App a -> RetryStatus -> App a
forall a b. a -> b -> a
const (App a -> RetryStatus -> App a) -> App a -> RetryStatus -> App a
forall a b. (a -> b) -> a -> b
$ Domain -> App a
go Domain
ownDomain)
  where
    logError :: p -> SomeException -> p -> m ()
logError p
willRetry (SomeException e
e) p
status = do
      RequestId
rid <- Getting RequestId Env RequestId -> m RequestId
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting RequestId Env RequestId
Lens' Env RequestId
reqId
      (Msg -> Msg) -> m ()
forall (m :: * -> *). MonadLogger m => (Msg -> Msg) -> m ()
Log.err ((Msg -> Msg) -> m ()) -> (Msg -> Msg) -> m ()
forall a b. (a -> b) -> a -> b
$
        forall a. ToBytes a => a -> Msg -> Msg
Log.msg @Text Text
"failed to enqueue notification in RabbitMQ"
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (e -> String
forall e. Exception e => e -> String
displayException e
e)
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> p -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"willRetry" p
willRetry
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> a -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"retryCount" p
status.rsIterNumber
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> RequestId -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"request" RequestId
rid
    go :: Domain -> App a
go Domain
ownDomain = do
      RequestId
rid <- Getting RequestId Env RequestId -> App RequestId
forall s (m :: * -> *) a. MonadReader s m => Getting a s a -> m a
view Getting RequestId Env RequestId
Lens' Env RequestId
reqId
      Maybe Channel
mChan <- Int -> App Channel -> App (Maybe Channel)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
1_000_000 (MVar Channel -> App Channel
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
readMVar MVar Channel
chanVar)
      case Maybe Channel
mChan of
        Maybe Channel
Nothing -> NoRabbitMqChannel -> App a
forall e a. (HasCallStack, Exception e) => e -> App a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM NoRabbitMqChannel
NoRabbitMqChannel
        Just Channel
chan -> do
          IO a -> App a
forall a. IO a -> App a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> App a) -> IO a -> App a
forall a b. (a -> b) -> a -> b
$ Channel
-> RequestId
-> Domain
-> Domain
-> DeliveryMode
-> FedQueueClient c a
-> IO a
forall {k} (c :: k) a.
Channel
-> RequestId
-> Domain
-> Domain
-> DeliveryMode
-> FedQueueClient c a
-> IO a
enqueue Channel
chan RequestId
rid Domain
ownDomain Domain
remoteDomain DeliveryMode
deliveryMode FedQueueClient c a
action

enqueueNotification :: Q.DeliveryMode -> Domain -> FedQueueClient c a -> ExceptT FederationError App a
enqueueNotification :: forall {k} (c :: k) a.
DeliveryMode
-> Domain -> FedQueueClient c a -> ExceptT FederationError App a
enqueueNotification DeliveryMode
deliveryMode Domain
remoteDomain FedQueueClient c a
action = do
  MVar Channel
chanVar <- ExceptT FederationError App (MVar Channel)
getChannel
  App a -> ExceptT FederationError App a
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT FederationError m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App a -> ExceptT FederationError App a)
-> App a -> ExceptT FederationError App a
forall a b. (a -> b) -> a -> b
$ Domain
-> DeliveryMode -> MVar Channel -> FedQueueClient c a -> App a
forall {k} (c :: k) a.
Domain
-> DeliveryMode -> MVar Channel -> FedQueueClient c a -> App a
enqueueSingleNotification Domain
remoteDomain DeliveryMode
deliveryMode MVar Channel
chanVar FedQueueClient c a
action

enqueueNotificationsConcurrently ::
  (Foldable f, Functor f) =>
  Q.DeliveryMode ->
  f (Remote x) ->
  (Remote [x] -> FedQueueClient c a) ->
  ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrently :: forall {k} (f :: * -> *) x (c :: k) a.
(Foldable f, Functor f) =>
DeliveryMode
-> f (Remote x)
-> (Remote [x] -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrently DeliveryMode
m f (Remote x)
xs Remote [x] -> FedQueueClient c a
f =
  DeliveryMode
-> [Remote [x]]
-> (Remote [x] -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
forall {k} (f :: * -> *) x (c :: k) a.
Foldable f =>
DeliveryMode
-> f (Remote x)
-> (Remote x -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrentlyBuckets DeliveryMode
m (f (Remote x) -> [Remote [x]]
forall (f :: * -> *) a.
(Functor f, Foldable f) =>
f (Remote a) -> [Remote [a]]
bucketRemote f (Remote x)
xs) Remote [x] -> FedQueueClient c a
f

enqueueNotificationsConcurrentlyBuckets ::
  (Foldable f) =>
  Q.DeliveryMode ->
  f (Remote x) ->
  (Remote x -> FedQueueClient c a) ->
  ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrentlyBuckets :: forall {k} (f :: * -> *) x (c :: k) a.
Foldable f =>
DeliveryMode
-> f (Remote x)
-> (Remote x -> FedQueueClient c a)
-> ExceptT FederationError App [Remote a]
enqueueNotificationsConcurrentlyBuckets DeliveryMode
m f (Remote x)
xs Remote x -> FedQueueClient c a
f = do
  case f (Remote x) -> [Remote x]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f (Remote x)
xs of
    -- only attempt to get a channel if there is at least one notification to send
    [] -> [Remote a] -> ExceptT FederationError App [Remote a]
forall a. a -> ExceptT FederationError App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    [Remote x]
_ -> do
      MVar Channel
chanVar <- ExceptT FederationError App (MVar Channel)
getChannel
      App [Remote a] -> ExceptT FederationError App [Remote a]
forall (m :: * -> *) a.
Monad m =>
m a -> ExceptT FederationError m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App [Remote a] -> ExceptT FederationError App [Remote a])
-> App [Remote a] -> ExceptT FederationError App [Remote a]
forall a b. (a -> b) -> a -> b
$ Int -> [Remote x] -> (Remote x -> App (Remote a)) -> App [Remote a]
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> t a -> (a -> m b) -> m (t b)
pooledForConcurrentlyN Int
8 (f (Remote x) -> [Remote x]
forall a. f a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList f (Remote x)
xs) ((Remote x -> App (Remote a)) -> App [Remote a])
-> (Remote x -> App (Remote a)) -> App [Remote a]
forall a b. (a -> b) -> a -> b
$ \Remote x
r ->
        Remote x -> a -> Remote a
forall (t :: QTag) x a.
QualifiedWithTag t x -> a -> QualifiedWithTag t a
qualifyAs Remote x
r
          (a -> Remote a) -> App a -> App (Remote a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Domain
-> DeliveryMode -> MVar Channel -> FedQueueClient c a -> App a
forall {k} (c :: k) a.
Domain
-> DeliveryMode -> MVar Channel -> FedQueueClient c a -> App a
enqueueSingleNotification (Remote x -> Domain
forall (t :: QTag) a. QualifiedWithTag t a -> Domain
tDomain Remote x
r) DeliveryMode
m MVar Channel
chanVar (Remote x -> FedQueueClient c a
f Remote x
r)

data NoRabbitMqChannel = NoRabbitMqChannel
  deriving (Int -> NoRabbitMqChannel -> ShowS
[NoRabbitMqChannel] -> ShowS
NoRabbitMqChannel -> String
(Int -> NoRabbitMqChannel -> ShowS)
-> (NoRabbitMqChannel -> String)
-> ([NoRabbitMqChannel] -> ShowS)
-> Show NoRabbitMqChannel
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> NoRabbitMqChannel -> ShowS
showsPrec :: Int -> NoRabbitMqChannel -> ShowS
$cshow :: NoRabbitMqChannel -> String
show :: NoRabbitMqChannel -> String
$cshowList :: [NoRabbitMqChannel] -> ShowS
showList :: [NoRabbitMqChannel] -> ShowS
Show)

instance Exception NoRabbitMqChannel