module Wire.NotificationSubsystem.Interpreter where

import Bilge (RequestId)
import Control.Concurrent.Async (Async)
import Control.Lens (set, (.~))
import Data.Aeson
import Data.List.NonEmpty (nonEmpty)
import Data.List1 (List1)
import Data.List1 qualified as List1
import Data.Proxy
import Data.Range
import Data.Set qualified as Set
import Data.Time.Clock.DiffTime
import Imports
import Numeric.Natural (Natural)
import Polysemy
import Polysemy.Async (async, sequenceConcurrently)
import Polysemy.Async qualified as P
import Polysemy.Error
import Polysemy.Input
import Polysemy.TinyLog qualified as P
import System.Logger.Class as Log
import Wire.API.Push.V2 hiding (Push (..), Recipient, newPush)
import Wire.API.Push.V2 qualified as V2
import Wire.API.Team.Member
import Wire.GundeckAPIAccess (GundeckAPIAccess)
import Wire.GundeckAPIAccess qualified as GundeckAPIAccess
import Wire.NotificationSubsystem
import Wire.Sem.Delay

-- | We interpret this using 'GundeckAPIAccess' so we can mock it out for testing.
runNotificationSubsystemGundeck ::
  ( Member GundeckAPIAccess r,
    Member P.Async r,
    Member Delay r,
    Member (Final IO) r,
    Member P.TinyLog r
  ) =>
  NotificationSubsystemConfig ->
  Sem (NotificationSubsystem : r) a ->
  Sem r a
runNotificationSubsystemGundeck :: forall (r :: EffectRow) a.
(Member GundeckAPIAccess r, Member Async r, Member Delay r,
 Member (Final IO) r, Member TinyLog r) =>
NotificationSubsystemConfig
-> Sem (NotificationSubsystem : r) a -> Sem r a
runNotificationSubsystemGundeck NotificationSubsystemConfig
cfg = (forall (rInitial :: EffectRow) x.
 NotificationSubsystem (Sem rInitial) x -> Sem r x)
-> Sem (NotificationSubsystem : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret ((forall (rInitial :: EffectRow) x.
  NotificationSubsystem (Sem rInitial) x -> Sem r x)
 -> Sem (NotificationSubsystem : r) a -> Sem r a)
-> (forall (rInitial :: EffectRow) x.
    NotificationSubsystem (Sem rInitial) x -> Sem r x)
-> Sem (NotificationSubsystem : r) a
-> Sem r a
forall a b. (a -> b) -> a -> b
$ \case
  PushNotifications [Push]
ps -> NotificationSubsystemConfig
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall i (r :: EffectRow) a. i -> Sem (Input i : r) a -> Sem r a
runInputConst NotificationSubsystemConfig
cfg (Sem (Input NotificationSubsystemConfig : r) x -> Sem r x)
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall a b. (a -> b) -> a -> b
$ [Push] -> Sem (Input NotificationSubsystemConfig : r) ()
forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r) =>
[Push] -> Sem r ()
pushImpl [Push]
ps
  PushNotificationsSlowly [Push]
ps -> NotificationSubsystemConfig
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall i (r :: EffectRow) a. i -> Sem (Input i : r) a -> Sem r a
runInputConst NotificationSubsystemConfig
cfg (Sem (Input NotificationSubsystemConfig : r) x -> Sem r x)
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall a b. (a -> b) -> a -> b
$ [Push] -> Sem (Input NotificationSubsystemConfig : r) ()
forall (r :: EffectRow).
(Member Delay r, Member (Input NotificationSubsystemConfig) r,
 Member GundeckAPIAccess r, Member Async r) =>
[Push] -> Sem r ()
pushSlowlyImpl [Push]
ps
  PushNotificationAsync Push
ps -> NotificationSubsystemConfig
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall i (r :: EffectRow) a. i -> Sem (Input i : r) a -> Sem r a
runInputConst NotificationSubsystemConfig
cfg (Sem (Input NotificationSubsystemConfig : r) x -> Sem r x)
-> Sem (Input NotificationSubsystemConfig : r) x -> Sem r x
forall a b. (a -> b) -> a -> b
$ Push
-> Sem (Input NotificationSubsystemConfig : r) (Async (Maybe ()))
forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r,
 Member (Final IO) r, Member TinyLog r) =>
Push -> Sem r (Async (Maybe ()))
pushAsyncImpl Push
ps
  CleanupUser UserId
uid -> UserId -> Sem r ()
forall (r :: EffectRow).
Member GundeckAPIAccess r =>
UserId -> Sem r ()
GundeckAPIAccess.userDeleted UserId
uid
  UnregisterPushClient UserId
uid ClientId
cid -> UserId -> ClientId -> Sem r ()
forall (r :: EffectRow).
Member GundeckAPIAccess r =>
UserId -> ClientId -> Sem r ()
GundeckAPIAccess.unregisterPushClient UserId
uid ClientId
cid
  GetPushTokens UserId
uid -> UserId -> Sem r [PushToken]
forall (r :: EffectRow).
Member GundeckAPIAccess r =>
UserId -> Sem r [PushToken]
GundeckAPIAccess.getPushTokens UserId
uid

data NotificationSubsystemConfig = NotificationSubsystemConfig
  { NotificationSubsystemConfig -> Range 1 HardTruncationLimit Int32
fanoutLimit :: Range 1 HardTruncationLimit Int32,
    NotificationSubsystemConfig -> Natural
chunkSize :: Natural,
    NotificationSubsystemConfig -> DiffTime
slowPushDelay :: DiffTime,
    NotificationSubsystemConfig -> RequestId
requestId :: RequestId
  }

defaultNotificationSubsystemConfig :: RequestId -> NotificationSubsystemConfig
defaultNotificationSubsystemConfig :: RequestId -> NotificationSubsystemConfig
defaultNotificationSubsystemConfig RequestId
reqId =
  Range 1 HardTruncationLimit Int32
-> Natural -> DiffTime -> RequestId -> NotificationSubsystemConfig
NotificationSubsystemConfig Range 1 HardTruncationLimit Int32
defaultFanoutLimit Natural
defaultChunkSize DiffTime
defaultSlowPushDelay RequestId
reqId

defaultFanoutLimit :: Range 1 HardTruncationLimit Int32
defaultFanoutLimit :: Range 1 HardTruncationLimit Int32
defaultFanoutLimit = Proxy HardTruncationLimit -> Range 1 HardTruncationLimit Int32
forall (n :: Natural) (x :: Natural) (m :: Natural) a.
(n <= x, x <= m, KnownNat x, Num a) =>
Proxy x -> Range n m a
toRange (forall (t :: Natural). Proxy t
forall {k} (t :: k). Proxy t
Proxy @HardTruncationLimit)

defaultChunkSize :: Natural
defaultChunkSize :: Natural
defaultChunkSize = Natural
128

defaultSlowPushDelay :: DiffTime
defaultSlowPushDelay :: DiffTime
defaultSlowPushDelay = Integer -> DiffTime
millisecondsToDiffTime Integer
20

pushAsyncImpl ::
  forall r.
  ( Member GundeckAPIAccess r,
    Member (Input NotificationSubsystemConfig) r,
    Member P.Async r,
    Member (Final IO) r,
    Member P.TinyLog r
  ) =>
  Push ->
  Sem r (Async (Maybe ()))
pushAsyncImpl :: forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r,
 Member (Final IO) r, Member TinyLog r) =>
Push -> Sem r (Async (Maybe ()))
pushAsyncImpl Push
p = Sem r () -> Sem r (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async (Sem r () -> Sem r (Async (Maybe ())))
-> Sem r () -> Sem r (Async (Maybe ()))
forall a b. (a -> b) -> a -> b
$ do
  RequestId
reqId <- (NotificationSubsystemConfig -> RequestId) -> Sem r RequestId
forall i j (r :: EffectRow).
Member (Input i) r =>
(i -> j) -> Sem r j
inputs NotificationSubsystemConfig -> RequestId
requestId
  forall e (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Error e : r) a -> Sem r (Either e a)
errorToIOFinal @SomeException (forall e (r :: EffectRow) a.
(Exception e, Member (Error e) r, Member (Final IO) r) =>
Sem r a -> Sem r a
fromExceptionSem @SomeException (Sem (Error SomeException : r) ()
 -> Sem (Error SomeException : r) ())
-> Sem (Error SomeException : r) ()
-> Sem (Error SomeException : r) ()
forall a b. (a -> b) -> a -> b
$ [Push] -> Sem (Error SomeException : r) ()
forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r) =>
[Push] -> Sem r ()
pushImpl [Push
p]) Sem r (Either SomeException ())
-> (Either SomeException () -> Sem r ()) -> Sem r ()
forall a b. Sem r a -> (a -> Sem r b) -> Sem r b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left SomeException
e ->
      (Msg -> Msg) -> Sem r ()
forall msg (r :: EffectRow).
Member (Logger msg) r =>
msg -> Sem r ()
P.err ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r ()
forall a b. (a -> b) -> a -> b
$
        Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Error while pushing notifications")
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> RequestId -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"requestId" RequestId
reqId
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> String -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"error" (SomeException -> String
forall e. Exception e => e -> String
displayException SomeException
e)
    Right ()
_ -> () -> Sem r ()
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

pushImpl ::
  forall r.
  ( Member GundeckAPIAccess r,
    Member (Input NotificationSubsystemConfig) r,
    Member P.Async r
  ) =>
  [Push] ->
  Sem r ()
pushImpl :: forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r) =>
[Push] -> Sem r ()
pushImpl [Push]
ps = do
  Range 1 HardTruncationLimit Int32
currentFanoutLimit <- (NotificationSubsystemConfig -> Range 1 HardTruncationLimit Int32)
-> Sem r (Range 1 HardTruncationLimit Int32)
forall i j (r :: EffectRow).
Member (Input i) r =>
(i -> j) -> Sem r j
inputs NotificationSubsystemConfig -> Range 1 HardTruncationLimit Int32
fanoutLimit
  Natural
pushChunkSize <- (NotificationSubsystemConfig -> Natural) -> Sem r Natural
forall i j (r :: EffectRow).
Member (Input i) r =>
(i -> j) -> Sem r j
inputs NotificationSubsystemConfig -> Natural
chunkSize

  let [[Push]]
pushes :: [[V2.Push]] =
        Natural -> [Push] -> [[Push]]
mkPushes Natural
pushChunkSize ([Push] -> [[Push]]) -> [Push] -> [[Push]]
forall a b. (a -> b) -> a -> b
$
          Range 1 HardTruncationLimit Int32 -> [Push] -> [Push]
forall (n :: Natural) (m :: Natural).
Range n m Int32 -> [Push] -> [Push]
removeIfLargeFanout Range 1 HardTruncationLimit Int32
currentFanoutLimit [Push]
ps
  Sem r [Maybe ()] -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Sem r [Maybe ()] -> Sem r ()) -> Sem r [Maybe ()] -> Sem r ()
forall a b. (a -> b) -> a -> b
$
    [Sem r ()] -> Sem r [Maybe ()]
forall (t :: * -> *) (r :: EffectRow) a.
(Traversable t, Member Async r) =>
t (Sem r a) -> Sem r (t (Maybe a))
sequenceConcurrently ([Sem r ()] -> Sem r [Maybe ()]) -> [Sem r ()] -> Sem r [Maybe ()]
forall a b. (a -> b) -> a -> b
$
      [Push] -> Sem r ()
forall (r :: EffectRow).
Member GundeckAPIAccess r =>
[Push] -> Sem r ()
GundeckAPIAccess.pushV2 ([Push] -> Sem r ()) -> [[Push]] -> [Sem r ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [[Push]]
pushes

removeIfLargeFanout :: Range n m Int32 -> [Push] -> [Push]
removeIfLargeFanout :: forall (n :: Natural) (m :: Natural).
Range n m Int32 -> [Push] -> [Push]
removeIfLargeFanout Range n m Int32
limit =
  (Push -> Bool) -> [Push] -> [Push]
forall a. (a -> Bool) -> [a] -> [a]
filter \Push {NonEmpty Recipient
_pushRecipients :: NonEmpty Recipient
$sel:_pushRecipients:Push :: Push -> NonEmpty Recipient
_pushRecipients} ->
    NonEmpty Recipient -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty Recipient
_pushRecipients Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Range n m Int32 -> Int32
forall (n :: Natural) (m :: Natural) a. Range n m a -> a
fromRange Range n m Int32
limit)

mkPushes :: Natural -> [Push] -> [[V2.Push]]
mkPushes :: Natural -> [Push] -> [[Push]]
mkPushes Natural
chunkSize = ([Push] -> [Push]) -> [[Push]] -> [[Push]]
forall a b. (a -> b) -> [a] -> [b]
map ((Push -> Push) -> [Push] -> [Push]
forall a b. (a -> b) -> [a] -> [b]
map Push -> Push
toV2Push) ([[Push]] -> [[Push]])
-> ([Push] -> [[Push]]) -> [Push] -> [[Push]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Natural -> [Push] -> [[Push]]
chunkPushes Natural
chunkSize

{-# INLINE [1] toV2Push #-}
toV2Push :: Push -> V2.Push
toV2Push :: Push -> Push
toV2Push Push
p =
  (Maybe UserId
-> Range 1 1024 (Set Recipient) -> List1 Object -> Push
V2.newPush Push
p.pushOrigin (Set Recipient -> Range 1 1024 (Set Recipient)
forall a (n :: Natural) (m :: Natural).
(Show a, KnownNat n, KnownNat m, Within a n m) =>
a -> Range n m a
unsafeRange ([Recipient] -> Set Recipient
forall a. Ord a => [a] -> Set a
Set.fromList [Recipient]
recipients)) List1 Object
pload)
    Push -> (Push -> Push) -> Push
forall a b. a -> (a -> b) -> b
& (Maybe ConnId -> Identity (Maybe ConnId)) -> Push -> Identity Push
Lens' Push (Maybe ConnId)
V2.pushOriginConnection ((Maybe ConnId -> Identity (Maybe ConnId))
 -> Push -> Identity Push)
-> Maybe ConnId -> Push -> Push
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Push -> Maybe ConnId
_pushConn Push
p
    Push -> (Push -> Push) -> Push
forall a b. a -> (a -> b) -> b
& (Bool -> Identity Bool) -> Push -> Identity Push
Lens' Push Bool
V2.pushTransient ((Bool -> Identity Bool) -> Push -> Identity Push)
-> Bool -> Push -> Push
forall s t a b. ASetter s t a b -> b -> s -> t
.~ Push -> Bool
_pushTransient Push
p
    Push -> (Push -> Push) -> Push
forall a b. a -> (a -> b) -> b
& (Push -> Push)
-> (Priority -> Push -> Push) -> Maybe Priority -> Push -> Push
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Push -> Push
forall a. a -> a
id (ASetter Push Push Priority Priority -> Priority -> Push -> Push
forall s t a b. ASetter s t a b -> b -> s -> t
set ASetter Push Push Priority Priority
Lens' Push Priority
V2.pushNativePriority) Push
p._pushNativePriority
  where
    pload :: List1 Object
    pload :: List1 Object
pload = Object -> List1 Object
forall a. a -> List1 a
List1.singleton (Push -> Object
pushJson Push
p)
    recipients :: [V2.Recipient]
    recipients :: [Recipient]
recipients = (Recipient -> Recipient) -> [Recipient] -> [Recipient]
forall a b. (a -> b) -> [a] -> [b]
map Recipient -> Recipient
toRecipient ([Recipient] -> [Recipient]) -> [Recipient] -> [Recipient]
forall a b. (a -> b) -> a -> b
$ NonEmpty Recipient -> [Recipient]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Push
p._pushRecipients
    toRecipient :: Recipient -> V2.Recipient
    toRecipient :: Recipient -> Recipient
toRecipient Recipient
r =
      (UserId -> Route -> Recipient
recipient Recipient
r.recipientUserId Push
p._pushRoute)
        { V2._recipientClients = r.recipientClients
        }

{-# INLINE [1] chunkPushes #-}
chunkPushes :: Natural -> [Push] -> [[Push]]
chunkPushes :: Natural -> [Push] -> [[Push]]
chunkPushes Natural
maxRecipients
  | Natural
maxRecipients Natural -> Natural -> Bool
forall a. Ord a => a -> a -> Bool
> Natural
0 = Natural -> [Push] -> [Push] -> [[Push]]
go Natural
0 []
  | Bool
otherwise = [[Push]] -> [Push] -> [[Push]]
forall a b. a -> b -> a
const []
  where
    go :: Natural -> [Push] -> [Push] -> [[Push]]
go Natural
_ [] [] = []
    go Natural
_ [Push]
acc [] = [[Push]
acc]
    go Natural
n [Push]
acc (Push
y : [Push]
ys)
      | Natural
n Natural -> Natural -> Bool
forall a. Ord a => a -> a -> Bool
>= Natural
maxRecipients = [Push]
acc [Push] -> [[Push]] -> [[Push]]
forall a. a -> [a] -> [a]
: Natural -> [Push] -> [Push] -> [[Push]]
go Natural
0 [] (Push
y Push -> [Push] -> [Push]
forall a. a -> [a] -> [a]
: [Push]
ys)
      | Bool
otherwise =
          let totalLength :: Natural
totalLength = (Natural
n Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NonEmpty Recipient -> Int
forall a. NonEmpty a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length Push
y._pushRecipients))
           in if Natural
totalLength Natural -> Natural -> Bool
forall a. Ord a => a -> a -> Bool
> Natural
maxRecipients
                then
                  let (Push
y1, Push
y2) = Natural -> Push -> (Push, Push)
splitPush (Natural
maxRecipients Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
- Natural
n) Push
y
                   in Natural -> [Push] -> [Push] -> [[Push]]
go Natural
maxRecipients (Push
y1 Push -> [Push] -> [Push]
forall a. a -> [a] -> [a]
: [Push]
acc) (Push
y2 Push -> [Push] -> [Push]
forall a. a -> [a] -> [a]
: [Push]
ys)
                else Natural -> [Push] -> [Push] -> [[Push]]
go Natural
totalLength (Push
y Push -> [Push] -> [Push]
forall a. a -> [a] -> [a]
: [Push]
acc) [Push]
ys

    -- n must be strictly > 0 and < length (_pushRecipients p)
    splitPush :: Natural -> Push -> (Push, Push)
    splitPush :: Natural -> Push -> (Push, Push)
splitPush Natural
n Push
p =
      let ([Recipient]
r1, [Recipient]
r2) = Int -> [Recipient] -> ([Recipient], [Recipient])
forall a. Int -> [a] -> ([a], [a])
splitAt (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
n) (NonEmpty Recipient -> [Recipient]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Push
p._pushRecipients)
       in (Push
p {_pushRecipients = fromJust $ nonEmpty r1}, Push
p {_pushRecipients = fromJust $ nonEmpty r2})

pushSlowlyImpl ::
  ( Member Delay r,
    Member (Input NotificationSubsystemConfig) r,
    Member GundeckAPIAccess r,
    Member P.Async r
  ) =>
  [Push] ->
  Sem r ()
pushSlowlyImpl :: forall (r :: EffectRow).
(Member Delay r, Member (Input NotificationSubsystemConfig) r,
 Member GundeckAPIAccess r, Member Async r) =>
[Push] -> Sem r ()
pushSlowlyImpl [Push]
ps =
  [Push] -> (Push -> Sem r ()) -> Sem r ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Push]
ps \Push
p -> do
    Int -> Sem r ()
forall (r :: EffectRow). Member Delay r => Int -> Sem r ()
delay (Int -> Sem r ()) -> Sem r Int -> Sem r ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (NotificationSubsystemConfig -> Int) -> Sem r Int
forall i j (r :: EffectRow).
Member (Input i) r =>
(i -> j) -> Sem r j
inputs (DiffTime -> Int
diffTimeToFullMicroseconds (DiffTime -> Int)
-> (NotificationSubsystemConfig -> DiffTime)
-> NotificationSubsystemConfig
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NotificationSubsystemConfig -> DiffTime
slowPushDelay)
    [Push] -> Sem r ()
forall (r :: EffectRow).
(Member GundeckAPIAccess r,
 Member (Input NotificationSubsystemConfig) r, Member Async r) =>
[Push] -> Sem r ()
pushImpl [Push
p]