-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2022 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

-- | See also: "Galley.API.TeamNotifications".
--
-- This module is a clone of "Gundeck.Notification.Data".
--
-- FUTUREWORK: this is a work-around because it only solves *some* problems with team events.
-- We should really use a scalable message queue instead.
module Galley.Cassandra.TeamNotifications
  ( interpretTeamNotificationStoreToCassandra,
  )
where

import Cassandra
import Control.Monad.Catch
import Control.Retry (exponentialBackoff, limitRetries, retrying)
import Data.Aeson qualified as JSON
import Data.Id
import Data.List1 (List1)
import Data.Range (Range, fromRange)
import Data.Sequence (Seq, ViewL (..), ViewR (..), (<|), (><))
import Data.Sequence qualified as Seq
import Data.Time (nominalDay, nominalDiffTimeToSeconds)
import Data.UUID.V1 qualified as UUID
import Galley.Cassandra.Store
import Galley.Cassandra.Util
import Galley.Data.TeamNotifications
import Galley.Effects
import Galley.Effects.TeamNotificationStore (TeamNotificationStore (..))
import Imports
import Network.HTTP.Types
import Network.Wai.Utilities hiding (Error)
import Polysemy
import Polysemy.Input
import Polysemy.TinyLog hiding (err)
import Wire.API.Internal.Notification

interpretTeamNotificationStoreToCassandra ::
  ( Member (Embed IO) r,
    Member (Input ClientState) r,
    Member TinyLog r
  ) =>
  Sem (TeamNotificationStore ': r) a ->
  Sem r a
interpretTeamNotificationStoreToCassandra :: forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r,
 Member TinyLog r) =>
Sem (TeamNotificationStore : r) a -> Sem r a
interpretTeamNotificationStoreToCassandra = (forall (rInitial :: EffectRow) x.
 TeamNotificationStore (Sem rInitial) x -> Sem r x)
-> Sem (TeamNotificationStore : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret ((forall (rInitial :: EffectRow) x.
  TeamNotificationStore (Sem rInitial) x -> Sem r x)
 -> Sem (TeamNotificationStore : r) a -> Sem r a)
-> (forall (rInitial :: EffectRow) x.
    TeamNotificationStore (Sem rInitial) x -> Sem r x)
-> Sem (TeamNotificationStore : r) a
-> Sem r a
forall a b. (a -> b) -> a -> b
$ \case
  CreateTeamNotification TeamId
tid NotificationId
nid List1 Event
objs -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.CreateTeamNotification"
    Client () -> Sem r ()
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r) =>
Client a -> Sem r a
embedClient (Client () -> Sem r ()) -> Client () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TeamId -> NotificationId -> List1 Event -> Client ()
add TeamId
tid NotificationId
nid List1 Event
objs
  GetTeamNotifications TeamId
tid Maybe NotificationId
mnid Range 1 10000 Int32
lim -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.GetTeamNotifications"
    Client ResultPage -> Sem r ResultPage
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r) =>
Client a -> Sem r a
embedClient (Client ResultPage -> Sem r ResultPage)
-> Client ResultPage -> Sem r ResultPage
forall a b. (a -> b) -> a -> b
$ TeamId
-> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch TeamId
tid Maybe NotificationId
mnid Range 1 10000 Int32
lim
  TeamNotificationStore (Sem rInitial) x
MkNotificationId -> do
    ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.MkNotificationId"
    IO x -> Sem r x
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed IO x
IO NotificationId
mkNotificationId

-- | 'Data.UUID.V1.nextUUID' is sometimes unsuccessful, so we try a few times.
mkNotificationId :: IO NotificationId
mkNotificationId :: IO NotificationId
mkNotificationId = do
  Maybe NotificationId
ni <- (UUID -> NotificationId) -> Maybe UUID -> Maybe NotificationId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UUID -> NotificationId
forall {k} (a :: k). UUID -> Id a
Id (Maybe UUID -> Maybe NotificationId)
-> IO (Maybe UUID) -> IO (Maybe NotificationId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RetryPolicyM IO
-> (RetryStatus -> Maybe UUID -> IO Bool)
-> (RetryStatus -> IO (Maybe UUID))
-> IO (Maybe UUID)
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying RetryPolicyM IO
x10 RetryStatus -> Maybe UUID -> IO Bool
forall {b} {a}. b -> Maybe a -> IO Bool
fun (IO (Maybe UUID) -> RetryStatus -> IO (Maybe UUID)
forall a b. a -> b -> a
const (IO (Maybe UUID) -> IO (Maybe UUID)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
UUID.nextUUID))
  IO NotificationId
-> (NotificationId -> IO NotificationId)
-> Maybe NotificationId
-> IO NotificationId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Error -> IO NotificationId
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM Error
err) NotificationId -> IO NotificationId
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NotificationId
ni
  where
    x10 :: RetryPolicyM IO
x10 = Int -> RetryPolicy
limitRetries Int
10 RetryPolicyM IO -> RetryPolicyM IO -> RetryPolicyM IO
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicyM IO
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
exponentialBackoff Int
10
    fun :: b -> Maybe a -> IO Bool
fun = (Maybe a -> IO Bool) -> b -> Maybe a -> IO Bool
forall a b. a -> b -> a
const (Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> IO Bool) -> (Maybe a -> Bool) -> Maybe a -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing)
    err :: Error
err = Status -> LText -> LText -> Error
mkError Status
status500 LText
"internal-error" LText
"unable to generate notification ID"

-- FUTUREWORK: the magic 32 should be made configurable, so it can be tuned
add ::
  TeamId ->
  NotificationId ->
  List1 JSON.Object ->
  Client ()
add :: TeamId -> NotificationId -> List1 Event -> Client ()
add TeamId
tid NotificationId
nid (ByteString -> Blob
Blob (ByteString -> Blob)
-> (List1 Event -> ByteString) -> List1 Event -> Blob
forall b c a. (b -> c) -> (a -> b) -> a -> c
. List1 Event -> ByteString
forall a. ToJSON a => a -> ByteString
JSON.encode -> Blob
payload) =
  PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
-> QueryParams (TeamId, NotificationId, Blob, Int32) -> Client ()
forall (m :: * -> *) a (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, RunQ q) =>
q W a () -> QueryParams a -> m ()
write PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
cqlInsert (Consistency
-> (TeamId, NotificationId, Blob, Int32)
-> QueryParams (TeamId, NotificationId, Blob, Int32)
forall a. Consistency -> a -> QueryParams a
params Consistency
LocalQuorum (TeamId
tid, NotificationId
nid, Blob
payload, Int32
notificationTTLSeconds)) Client () -> (Client () -> Client ()) -> Client ()
forall a b. a -> (a -> b) -> b
& RetrySettings -> Client () -> Client ()
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x5
  where
    cqlInsert :: PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
    cqlInsert :: PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
cqlInsert =
      PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
"INSERT INTO team_notifications \
      \(team, id, payload) VALUES \
      \(?, ?, ?) \
      \USING TTL ?"

-- |
--
-- >>> import Data.Time
-- >>> formatTime defaultTimeLocale "%d days, %H hours, %M minutes, %S seconds" (secondsToNominalDiffTime (fromIntegral notificationTTLSeconds))
-- "28 days, 0 hours, 0 minutes, 0 seconds"
notificationTTLSeconds :: Int32
notificationTTLSeconds :: Int32
notificationTTLSeconds = Pico -> Int32
forall b. Integral b => Pico -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Pico -> Int32) -> Pico -> Int32
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Pico
nominalDiffTimeToSeconds (NominalDiffTime -> Pico) -> NominalDiffTime -> Pico
forall a b. (a -> b) -> a -> b
$ NominalDiffTime
28 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
nominalDay

fetch :: TeamId -> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch :: TeamId
-> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch TeamId
tid Maybe NotificationId
since (Range 1 10000 Int32 -> Int32
forall (n :: Nat) (m :: Nat) a. Range n m a -> a
fromRange -> Int32
size) = do
  -- We always need to look for one more than requested in order to correctly
  -- report whether there are more results.
  let size' :: Int32
size' = (Int32 -> Int32) -> (Int32 -> Int32) -> Bool -> Int32 -> Int32
forall a. a -> a -> Bool -> a
bool (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
2) (Maybe NotificationId -> Bool
forall a. Maybe a -> Bool
isJust Maybe NotificationId
since) Int32
size
  Page (TimeUuid, Blob)
page1 <- case UUID -> TimeUuid
TimeUuid (UUID -> TimeUuid)
-> (NotificationId -> UUID) -> NotificationId -> TimeUuid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NotificationId -> UUID
forall {k} (a :: k). Id a -> UUID
toUUID (NotificationId -> TimeUuid)
-> Maybe NotificationId -> Maybe TimeUuid
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NotificationId
since of
    Maybe TimeUuid
Nothing -> PrepQuery R (Identity TeamId) (TimeUuid, Blob)
-> QueryParams (Identity TeamId) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a b (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> m (Page b)
paginate PrepQuery R (Identity TeamId) (TimeUuid, Blob)
cqlStart (Consistency
-> Identity TeamId -> Int32 -> QueryParams (Identity TeamId)
forall a. Consistency -> a -> Int32 -> QueryParams a
paramsP Consistency
LocalQuorum (TeamId -> Identity TeamId
forall a. a -> Identity a
Identity TeamId
tid) Int32
size') Client (Page (TimeUuid, Blob))
-> (Client (Page (TimeUuid, Blob))
    -> Client (Page (TimeUuid, Blob)))
-> Client (Page (TimeUuid, Blob))
forall a b. a -> (a -> b) -> b
& RetrySettings
-> Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x1
    Just TimeUuid
s -> PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
-> QueryParams (TeamId, TimeUuid) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a b (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> m (Page b)
paginate PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
cqlSince (Consistency
-> (TeamId, TimeUuid) -> Int32 -> QueryParams (TeamId, TimeUuid)
forall a. Consistency -> a -> Int32 -> QueryParams a
paramsP Consistency
LocalQuorum (TeamId
tid, TimeUuid
s) Int32
size') Client (Page (TimeUuid, Blob))
-> (Client (Page (TimeUuid, Blob))
    -> Client (Page (TimeUuid, Blob)))
-> Client (Page (TimeUuid, Blob))
forall a b. a -> (a -> b) -> b
& RetrySettings
-> Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x1
  -- Collect results, requesting more pages until we run out of data
  -- or have found size + 1 notifications (not including the 'since').
  let isize :: Int
isize = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
size' :: Int
  (Seq QueuedNotification
ns, Bool
more) <- Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
forall a. Seq a
Seq.empty Int
isize Page (TimeUuid, Blob)
page1
  -- Drop the extra element from the end as well.  Keep the inclusive start
  -- value in the response (if a 'since' was given and found).
  -- This can probably simplified a lot further, but we need to understand
  -- 'Seq' in order to do that.  If you find a bug, this may be a good
  -- place to start looking.
  ResultPage -> Client ResultPage
forall a. a -> Client a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ResultPage -> Client ResultPage)
-> ResultPage -> Client ResultPage
forall a b. (a -> b) -> a -> b
$! case Seq QueuedNotification -> ViewL QueuedNotification
forall a. Seq a -> ViewL a
Seq.viewl (Int -> Seq QueuedNotification -> Seq QueuedNotification
forall a. Int -> Seq a -> Seq a
trim (Int
isize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Seq QueuedNotification
ns) of
    ViewL QueuedNotification
EmptyL -> Seq QueuedNotification -> Bool -> ResultPage
ResultPage Seq QueuedNotification
forall a. Seq a
Seq.empty Bool
False
    (QueuedNotification
x :< Seq QueuedNotification
xs) -> Seq QueuedNotification -> Bool -> ResultPage
ResultPage (QueuedNotification
x QueuedNotification
-> Seq QueuedNotification -> Seq QueuedNotification
forall a. a -> Seq a -> Seq a
<| Seq QueuedNotification
xs) Bool
more
  where
    collect ::
      Seq QueuedNotification ->
      Int ->
      Page (TimeUuid, Blob) ->
      Client (Seq QueuedNotification, Bool)
    collect :: Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
acc Int
num Page (TimeUuid, Blob)
page =
      let ns :: ([QueuedNotification], [QueuedNotification])
ns = Int
-> [QueuedNotification]
-> ([QueuedNotification], [QueuedNotification])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
num ([QueuedNotification]
 -> ([QueuedNotification], [QueuedNotification]))
-> [QueuedNotification]
-> ([QueuedNotification], [QueuedNotification])
forall a b. (a -> b) -> a -> b
$ ((TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification])
-> [QueuedNotification]
-> [(TimeUuid, Blob)]
-> [QueuedNotification]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif [] (Page (TimeUuid, Blob) -> [(TimeUuid, Blob)]
forall a. Page a -> [a]
result Page (TimeUuid, Blob)
page)
          nseq :: Seq QueuedNotification
nseq = [QueuedNotification] -> Seq QueuedNotification
forall a. [a] -> Seq a
Seq.fromList (([QueuedNotification], [QueuedNotification])
-> [QueuedNotification]
forall a b. (a, b) -> a
fst ([QueuedNotification], [QueuedNotification])
ns)
          more :: Bool
more = Page (TimeUuid, Blob) -> Bool
forall a. Page a -> Bool
hasMore Page (TimeUuid, Blob)
page
          num' :: Int
num' = Int
num Int -> Int -> Int
forall a. Num a => a -> a -> a
- Seq QueuedNotification -> Int
forall a. Seq a -> Int
Seq.length Seq QueuedNotification
nseq
          acc' :: Seq QueuedNotification
acc' = Seq QueuedNotification
acc Seq QueuedNotification
-> Seq QueuedNotification -> Seq QueuedNotification
forall a. Seq a -> Seq a -> Seq a
>< Seq QueuedNotification
nseq
       in if Bool -> Bool
not Bool
more Bool -> Bool -> Bool
|| Int
num' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
            then (Seq QueuedNotification, Bool)
-> Client (Seq QueuedNotification, Bool)
forall a. a -> Client a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Seq QueuedNotification
acc', Bool
more Bool -> Bool -> Bool
|| Bool -> Bool
not ([QueuedNotification] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (([QueuedNotification], [QueuedNotification])
-> [QueuedNotification]
forall a b. (a, b) -> b
snd ([QueuedNotification], [QueuedNotification])
ns)))
            else Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall a. Client a -> Client a
forall (m :: * -> *) a. MonadClient m => Client a -> m a
liftClient (Page (TimeUuid, Blob) -> Client (Page (TimeUuid, Blob))
forall a. Page a -> Client (Page a)
nextPage Page (TimeUuid, Blob)
page) Client (Page (TimeUuid, Blob))
-> (Page (TimeUuid, Blob) -> Client (Seq QueuedNotification, Bool))
-> Client (Seq QueuedNotification, Bool)
forall a b. Client a -> (a -> Client b) -> Client b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
acc' Int
num'
    trim :: Int -> Seq a -> Seq a
    trim :: forall a. Int -> Seq a -> Seq a
trim Int
l Seq a
ns
      | Seq a -> Int
forall a. Seq a -> Int
Seq.length Seq a
ns Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
l = Seq a
ns
      | Bool
otherwise = case Seq a -> ViewR a
forall a. Seq a -> ViewR a
Seq.viewr Seq a
ns of
          ViewR a
EmptyR -> Seq a
ns
          Seq a
xs :> a
_ -> Seq a
xs
    cqlStart :: PrepQuery R (Identity TeamId) (TimeUuid, Blob)
    cqlStart :: PrepQuery R (Identity TeamId) (TimeUuid, Blob)
cqlStart =
      PrepQuery R (Identity TeamId) (TimeUuid, Blob)
"SELECT id, payload \
      \FROM team_notifications \
      \WHERE team = ? \
      \ORDER BY id ASC"
    cqlSince :: PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
    cqlSince :: PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
cqlSince =
      PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
"SELECT id, payload \
      \FROM team_notifications \
      \WHERE team = ? AND id >= ? \
      \ORDER BY id ASC"

-------------------------------------------------------------------------------
-- Conversions

toNotif :: (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif :: (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif (TimeUuid
i, Blob
b) [QueuedNotification]
ns =
  [QueuedNotification]
-> (NonEmpty Event -> [QueuedNotification])
-> Maybe (NonEmpty Event)
-> [QueuedNotification]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
    [QueuedNotification]
ns
    (\NonEmpty Event
p1 -> NotificationId -> NonEmpty Event -> QueuedNotification
queuedNotification NotificationId
notifId NonEmpty Event
p1 QueuedNotification -> [QueuedNotification] -> [QueuedNotification]
forall a. a -> [a] -> [a]
: [QueuedNotification]
ns)
    ( ByteString -> Maybe (NonEmpty Event)
forall a. FromJSON a => ByteString -> Maybe a
JSON.decode' (Blob -> ByteString
fromBlob Blob
b)
    -- FUTUREWORK: this is from the database, so it's slightly more ok to ignore parse
    -- errors than if it's data provided by a client.  it would still be better to have an
    -- error entry in the log file and crash, rather than ignore the error and continue.
    )
  where
    notifId :: NotificationId
notifId = UUID -> NotificationId
forall {k} (a :: k). UUID -> Id a
Id (TimeUuid -> UUID
fromTimeUuid TimeUuid
i)