module Galley.Cassandra.TeamNotifications
( interpretTeamNotificationStoreToCassandra,
)
where
import Cassandra
import Control.Monad.Catch
import Control.Retry (exponentialBackoff, limitRetries, retrying)
import Data.Aeson qualified as JSON
import Data.Id
import Data.List1 (List1)
import Data.Range (Range, fromRange)
import Data.Sequence (Seq, ViewL (..), ViewR (..), (<|), (><))
import Data.Sequence qualified as Seq
import Data.Time (nominalDay, nominalDiffTimeToSeconds)
import Data.UUID.V1 qualified as UUID
import Galley.Cassandra.Store
import Galley.Cassandra.Util
import Galley.Data.TeamNotifications
import Galley.Effects
import Galley.Effects.TeamNotificationStore (TeamNotificationStore (..))
import Imports
import Network.HTTP.Types
import Network.Wai.Utilities hiding (Error)
import Polysemy
import Polysemy.Input
import Polysemy.TinyLog hiding (err)
import Wire.API.Internal.Notification
interpretTeamNotificationStoreToCassandra ::
( Member (Embed IO) r,
Member (Input ClientState) r,
Member TinyLog r
) =>
Sem (TeamNotificationStore ': r) a ->
Sem r a
interpretTeamNotificationStoreToCassandra :: forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r,
Member TinyLog r) =>
Sem (TeamNotificationStore : r) a -> Sem r a
interpretTeamNotificationStoreToCassandra = (forall (rInitial :: EffectRow) x.
TeamNotificationStore (Sem rInitial) x -> Sem r x)
-> Sem (TeamNotificationStore : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret ((forall (rInitial :: EffectRow) x.
TeamNotificationStore (Sem rInitial) x -> Sem r x)
-> Sem (TeamNotificationStore : r) a -> Sem r a)
-> (forall (rInitial :: EffectRow) x.
TeamNotificationStore (Sem rInitial) x -> Sem r x)
-> Sem (TeamNotificationStore : r) a
-> Sem r a
forall a b. (a -> b) -> a -> b
$ \case
CreateTeamNotification TeamId
tid NotificationId
nid List1 Event
objs -> do
ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.CreateTeamNotification"
Client () -> Sem r ()
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r) =>
Client a -> Sem r a
embedClient (Client () -> Sem r ()) -> Client () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ TeamId -> NotificationId -> List1 Event -> Client ()
add TeamId
tid NotificationId
nid List1 Event
objs
GetTeamNotifications TeamId
tid Maybe NotificationId
mnid Range 1 10000 Int32
lim -> do
ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.GetTeamNotifications"
Client ResultPage -> Sem r ResultPage
forall (r :: EffectRow) a.
(Member (Embed IO) r, Member (Input ClientState) r) =>
Client a -> Sem r a
embedClient (Client ResultPage -> Sem r ResultPage)
-> Client ResultPage -> Sem r ResultPage
forall a b. (a -> b) -> a -> b
$ TeamId
-> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch TeamId
tid Maybe NotificationId
mnid Range 1 10000 Int32
lim
TeamNotificationStore (Sem rInitial) x
MkNotificationId -> do
ByteString -> Sem r ()
forall (r :: EffectRow). Member TinyLog r => ByteString -> Sem r ()
logEffect ByteString
"TeamNotificationStore.MkNotificationId"
IO x -> Sem r x
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed IO x
IO NotificationId
mkNotificationId
mkNotificationId :: IO NotificationId
mkNotificationId :: IO NotificationId
mkNotificationId = do
Maybe NotificationId
ni <- (UUID -> NotificationId) -> Maybe UUID -> Maybe NotificationId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap UUID -> NotificationId
forall {k} (a :: k). UUID -> Id a
Id (Maybe UUID -> Maybe NotificationId)
-> IO (Maybe UUID) -> IO (Maybe NotificationId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> RetryPolicyM IO
-> (RetryStatus -> Maybe UUID -> IO Bool)
-> (RetryStatus -> IO (Maybe UUID))
-> IO (Maybe UUID)
forall (m :: * -> *) b.
MonadIO m =>
RetryPolicyM m
-> (RetryStatus -> b -> m Bool) -> (RetryStatus -> m b) -> m b
retrying RetryPolicyM IO
x10 RetryStatus -> Maybe UUID -> IO Bool
forall {b} {a}. b -> Maybe a -> IO Bool
fun (IO (Maybe UUID) -> RetryStatus -> IO (Maybe UUID)
forall a b. a -> b -> a
const (IO (Maybe UUID) -> IO (Maybe UUID)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe UUID)
UUID.nextUUID))
IO NotificationId
-> (NotificationId -> IO NotificationId)
-> Maybe NotificationId
-> IO NotificationId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Error -> IO NotificationId
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM Error
err) NotificationId -> IO NotificationId
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NotificationId
ni
where
x10 :: RetryPolicyM IO
x10 = Int -> RetryPolicy
limitRetries Int
10 RetryPolicyM IO -> RetryPolicyM IO -> RetryPolicyM IO
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicyM IO
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
exponentialBackoff Int
10
fun :: b -> Maybe a -> IO Bool
fun = (Maybe a -> IO Bool) -> b -> Maybe a -> IO Bool
forall a b. a -> b -> a
const (Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> IO Bool) -> (Maybe a -> Bool) -> Maybe a -> IO Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing)
err :: Error
err = Status -> LText -> LText -> Error
mkError Status
status500 LText
"internal-error" LText
"unable to generate notification ID"
add ::
TeamId ->
NotificationId ->
List1 JSON.Object ->
Client ()
add :: TeamId -> NotificationId -> List1 Event -> Client ()
add TeamId
tid NotificationId
nid (ByteString -> Blob
Blob (ByteString -> Blob)
-> (List1 Event -> ByteString) -> List1 Event -> Blob
forall b c a. (b -> c) -> (a -> b) -> a -> c
. List1 Event -> ByteString
forall a. ToJSON a => a -> ByteString
JSON.encode -> Blob
payload) =
PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
-> QueryParams (TeamId, NotificationId, Blob, Int32) -> Client ()
forall (m :: * -> *) a (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, RunQ q) =>
q W a () -> QueryParams a -> m ()
write PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
cqlInsert (Consistency
-> (TeamId, NotificationId, Blob, Int32)
-> QueryParams (TeamId, NotificationId, Blob, Int32)
forall a. Consistency -> a -> QueryParams a
params Consistency
LocalQuorum (TeamId
tid, NotificationId
nid, Blob
payload, Int32
notificationTTLSeconds)) Client () -> (Client () -> Client ()) -> Client ()
forall a b. a -> (a -> b) -> b
& RetrySettings -> Client () -> Client ()
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x5
where
cqlInsert :: PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
cqlInsert :: PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
cqlInsert =
PrepQuery W (TeamId, NotificationId, Blob, Int32) ()
"INSERT INTO team_notifications \
\(team, id, payload) VALUES \
\(?, ?, ?) \
\USING TTL ?"
notificationTTLSeconds :: Int32
notificationTTLSeconds :: Int32
notificationTTLSeconds = Pico -> Int32
forall b. Integral b => Pico -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (Pico -> Int32) -> Pico -> Int32
forall a b. (a -> b) -> a -> b
$ NominalDiffTime -> Pico
nominalDiffTimeToSeconds (NominalDiffTime -> Pico) -> NominalDiffTime -> Pico
forall a b. (a -> b) -> a -> b
$ NominalDiffTime
28 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
nominalDay
fetch :: TeamId -> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch :: TeamId
-> Maybe NotificationId -> Range 1 10000 Int32 -> Client ResultPage
fetch TeamId
tid Maybe NotificationId
since (Range 1 10000 Int32 -> Int32
forall (n :: Nat) (m :: Nat) a. Range n m a -> a
fromRange -> Int32
size) = do
let size' :: Int32
size' = (Int32 -> Int32) -> (Int32 -> Int32) -> Bool -> Int32 -> Int32
forall a. a -> a -> Bool -> a
bool (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
1) (Int32 -> Int32 -> Int32
forall a. Num a => a -> a -> a
+ Int32
2) (Maybe NotificationId -> Bool
forall a. Maybe a -> Bool
isJust Maybe NotificationId
since) Int32
size
Page (TimeUuid, Blob)
page1 <- case UUID -> TimeUuid
TimeUuid (UUID -> TimeUuid)
-> (NotificationId -> UUID) -> NotificationId -> TimeUuid
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NotificationId -> UUID
forall {k} (a :: k). Id a -> UUID
toUUID (NotificationId -> TimeUuid)
-> Maybe NotificationId -> Maybe TimeUuid
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe NotificationId
since of
Maybe TimeUuid
Nothing -> PrepQuery R (Identity TeamId) (TimeUuid, Blob)
-> QueryParams (Identity TeamId) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a b (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> m (Page b)
paginate PrepQuery R (Identity TeamId) (TimeUuid, Blob)
cqlStart (Consistency
-> Identity TeamId -> Int32 -> QueryParams (Identity TeamId)
forall a. Consistency -> a -> Int32 -> QueryParams a
paramsP Consistency
LocalQuorum (TeamId -> Identity TeamId
forall a. a -> Identity a
Identity TeamId
tid) Int32
size') Client (Page (TimeUuid, Blob))
-> (Client (Page (TimeUuid, Blob))
-> Client (Page (TimeUuid, Blob)))
-> Client (Page (TimeUuid, Blob))
forall a b. a -> (a -> b) -> b
& RetrySettings
-> Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x1
Just TimeUuid
s -> PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
-> QueryParams (TeamId, TimeUuid) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a b (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> m (Page b)
paginate PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
cqlSince (Consistency
-> (TeamId, TimeUuid) -> Int32 -> QueryParams (TeamId, TimeUuid)
forall a. Consistency -> a -> Int32 -> QueryParams a
paramsP Consistency
LocalQuorum (TeamId
tid, TimeUuid
s) Int32
size') Client (Page (TimeUuid, Blob))
-> (Client (Page (TimeUuid, Blob))
-> Client (Page (TimeUuid, Blob)))
-> Client (Page (TimeUuid, Blob))
forall a b. a -> (a -> b) -> b
& RetrySettings
-> Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
x1
let isize :: Int
isize = Int32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int32
size' :: Int
(Seq QueuedNotification
ns, Bool
more) <- Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
forall a. Seq a
Seq.empty Int
isize Page (TimeUuid, Blob)
page1
ResultPage -> Client ResultPage
forall a. a -> Client a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ResultPage -> Client ResultPage)
-> ResultPage -> Client ResultPage
forall a b. (a -> b) -> a -> b
$! case Seq QueuedNotification -> ViewL QueuedNotification
forall a. Seq a -> ViewL a
Seq.viewl (Int -> Seq QueuedNotification -> Seq QueuedNotification
forall a. Int -> Seq a -> Seq a
trim (Int
isize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Seq QueuedNotification
ns) of
ViewL QueuedNotification
EmptyL -> Seq QueuedNotification -> Bool -> ResultPage
ResultPage Seq QueuedNotification
forall a. Seq a
Seq.empty Bool
False
(QueuedNotification
x :< Seq QueuedNotification
xs) -> Seq QueuedNotification -> Bool -> ResultPage
ResultPage (QueuedNotification
x QueuedNotification
-> Seq QueuedNotification -> Seq QueuedNotification
forall a. a -> Seq a -> Seq a
<| Seq QueuedNotification
xs) Bool
more
where
collect ::
Seq QueuedNotification ->
Int ->
Page (TimeUuid, Blob) ->
Client (Seq QueuedNotification, Bool)
collect :: Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
acc Int
num Page (TimeUuid, Blob)
page =
let ns :: ([QueuedNotification], [QueuedNotification])
ns = Int
-> [QueuedNotification]
-> ([QueuedNotification], [QueuedNotification])
forall a. Int -> [a] -> ([a], [a])
splitAt Int
num ([QueuedNotification]
-> ([QueuedNotification], [QueuedNotification]))
-> [QueuedNotification]
-> ([QueuedNotification], [QueuedNotification])
forall a b. (a -> b) -> a -> b
$ ((TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification])
-> [QueuedNotification]
-> [(TimeUuid, Blob)]
-> [QueuedNotification]
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif [] (Page (TimeUuid, Blob) -> [(TimeUuid, Blob)]
forall a. Page a -> [a]
result Page (TimeUuid, Blob)
page)
nseq :: Seq QueuedNotification
nseq = [QueuedNotification] -> Seq QueuedNotification
forall a. [a] -> Seq a
Seq.fromList (([QueuedNotification], [QueuedNotification])
-> [QueuedNotification]
forall a b. (a, b) -> a
fst ([QueuedNotification], [QueuedNotification])
ns)
more :: Bool
more = Page (TimeUuid, Blob) -> Bool
forall a. Page a -> Bool
hasMore Page (TimeUuid, Blob)
page
num' :: Int
num' = Int
num Int -> Int -> Int
forall a. Num a => a -> a -> a
- Seq QueuedNotification -> Int
forall a. Seq a -> Int
Seq.length Seq QueuedNotification
nseq
acc' :: Seq QueuedNotification
acc' = Seq QueuedNotification
acc Seq QueuedNotification
-> Seq QueuedNotification -> Seq QueuedNotification
forall a. Seq a -> Seq a -> Seq a
>< Seq QueuedNotification
nseq
in if Bool -> Bool
not Bool
more Bool -> Bool -> Bool
|| Int
num' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then (Seq QueuedNotification, Bool)
-> Client (Seq QueuedNotification, Bool)
forall a. a -> Client a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Seq QueuedNotification
acc', Bool
more Bool -> Bool -> Bool
|| Bool -> Bool
not ([QueuedNotification] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (([QueuedNotification], [QueuedNotification])
-> [QueuedNotification]
forall a b. (a, b) -> b
snd ([QueuedNotification], [QueuedNotification])
ns)))
else Client (Page (TimeUuid, Blob)) -> Client (Page (TimeUuid, Blob))
forall a. Client a -> Client a
forall (m :: * -> *) a. MonadClient m => Client a -> m a
liftClient (Page (TimeUuid, Blob) -> Client (Page (TimeUuid, Blob))
forall a. Page a -> Client (Page a)
nextPage Page (TimeUuid, Blob)
page) Client (Page (TimeUuid, Blob))
-> (Page (TimeUuid, Blob) -> Client (Seq QueuedNotification, Bool))
-> Client (Seq QueuedNotification, Bool)
forall a b. Client a -> (a -> Client b) -> Client b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Seq QueuedNotification
-> Int
-> Page (TimeUuid, Blob)
-> Client (Seq QueuedNotification, Bool)
collect Seq QueuedNotification
acc' Int
num'
trim :: Int -> Seq a -> Seq a
trim :: forall a. Int -> Seq a -> Seq a
trim Int
l Seq a
ns
| Seq a -> Int
forall a. Seq a -> Int
Seq.length Seq a
ns Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
l = Seq a
ns
| Bool
otherwise = case Seq a -> ViewR a
forall a. Seq a -> ViewR a
Seq.viewr Seq a
ns of
ViewR a
EmptyR -> Seq a
ns
Seq a
xs :> a
_ -> Seq a
xs
cqlStart :: PrepQuery R (Identity TeamId) (TimeUuid, Blob)
cqlStart :: PrepQuery R (Identity TeamId) (TimeUuid, Blob)
cqlStart =
PrepQuery R (Identity TeamId) (TimeUuid, Blob)
"SELECT id, payload \
\FROM team_notifications \
\WHERE team = ? \
\ORDER BY id ASC"
cqlSince :: PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
cqlSince :: PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
cqlSince =
PrepQuery R (TeamId, TimeUuid) (TimeUuid, Blob)
"SELECT id, payload \
\FROM team_notifications \
\WHERE team = ? AND id >= ? \
\ORDER BY id ASC"
toNotif :: (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif :: (TimeUuid, Blob) -> [QueuedNotification] -> [QueuedNotification]
toNotif (TimeUuid
i, Blob
b) [QueuedNotification]
ns =
[QueuedNotification]
-> (NonEmpty Event -> [QueuedNotification])
-> Maybe (NonEmpty Event)
-> [QueuedNotification]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
[QueuedNotification]
ns
(\NonEmpty Event
p1 -> NotificationId -> NonEmpty Event -> QueuedNotification
queuedNotification NotificationId
notifId NonEmpty Event
p1 QueuedNotification -> [QueuedNotification] -> [QueuedNotification]
forall a. a -> [a] -> [a]
: [QueuedNotification]
ns)
( ByteString -> Maybe (NonEmpty Event)
forall a. FromJSON a => ByteString -> Maybe a
JSON.decode' (Blob -> ByteString
fromBlob Blob
b)
)
where
notifId :: NotificationId
notifId = UUID -> NotificationId
forall {k} (a :: k). UUID -> Id a
Id (TimeUuid -> UUID
fromTimeUuid TimeUuid
i)