{-# OPTIONS_GHC -Wno-ambiguous-fields #-}

-- This file is part of the Wire Server implementation.
--
-- Copyright (C) 2025 Wire Swiss GmbH <opensource@wire.com>
--
-- This program is free software: you can redistribute it and/or modify it under
-- the terms of the GNU Affero General Public License as published by the Free
-- Software Foundation, either version 3 of the License, or (at your option) any
-- later version.
--
-- This program is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-- FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
-- details.
--
-- You should have received a copy of the GNU Affero General Public License along
-- with this program. If not, see <https://www.gnu.org/licenses/>.

module Test.Events where

import API.Brig
import API.BrigCommon
import API.Common
import API.Galley
import API.Gundeck
import qualified API.GundeckInternal as GundeckInternal
import qualified Control.Concurrent.Timeout as Timeout
import Control.Lens ((.~), (^?!))
import Control.Monad.Codensity
import Control.Monad.RWS (asks)
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Control.Retry
import Data.ByteString.Conversion (toByteString')
import qualified Data.ProtoLens as Proto
import Data.ProtoLens.Labels ()
import Data.Proxy (Proxy (..))
import qualified Data.Text as Text
import Data.Timeout
import Network.AMQP.Extended
import Network.RabbitMqAdmin
import qualified Network.WebSockets as WS
import Notifications
import Numeric.Lens
import qualified Proto.Otr as Proto
import qualified Proto.Otr_Fields as Proto
import Servant.API (AsApi, ToServant, toServant)
import Servant.API.Generic (fromServant)
import Servant.Client (AsClientT)
import qualified Servant.Client as Servant
import SetupHelpers
import Testlib.Prelude
import Testlib.ResourcePool
import UnliftIO hiding (handle)

testConsumeEventsOneWebSocket :: (HasCallStack) => App ()
testConsumeEventsOneWebSocket :: HasCallStack => App ()
testConsumeEventsOneWebSocket = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def

  lastNotifResp <-
    retrying
      (constantDelay 10_000 <> limitRetries 10)
      (\RetryStatus
_ Response
resp -> Bool -> App Bool
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> App Bool) -> Bool -> App Bool
forall a b. (a -> b) -> a -> b
$ Response
resp.status Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
404)
      (\RetryStatus
_ -> Value -> GetNotification -> App Response
forall user.
(HasCallStack, MakesValue user) =>
user -> GetNotification -> App Response
getLastNotification Value
alice GetNotification
forall a. Default a => a
def)
  lastNotifId <- lastNotifResp.json %. "id" & asString

  client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  clientId <- objId client

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    deliveryTag <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
    assertNoEvent_ ws

    sendAck ws deliveryTag False
    assertNoEvent_ ws

    handle <- randomHandle
    putHandle alice handle >>= assertSuccess

    assertEvent ws $ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle

  -- No new notifications should be stored in Cassandra as the user doesn't have
  -- any legacy clients
  getNotifications alice def {since = Just lastNotifId} `bindResponse` \Response
resp -> do
    Response
resp.status Int -> Int -> App ()
forall a. (MakesValue a, HasCallStack) => a -> Int -> App ()
`shouldMatchInt` Int
200
    App Value -> App ()
forall a. (MakesValue a, HasCallStack) => a -> App ()
shouldBeEmpty (App Value -> App ()) -> App Value -> App ()
forall a b. (a -> b) -> a -> b
$ Response
resp.json App Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"notifications"

testWebSocketTimeout :: (HasCallStack) => App ()
testWebSocketTimeout :: HasCallStack => App ()
testWebSocketTimeout = ServiceOverrides -> (HasCallStack => String -> App ()) -> App ()
forall a.
HasCallStack =>
ServiceOverrides -> (HasCallStack => String -> App a) -> App a
withModifiedBackend
  ServiceOverrides
forall a. Default a => a
def
    { cannonCfg =
        setField "wsOpts.activityTimeout" (1000000 :: Int)
          >=> setField "wsOpts.pongTimeout" (1000000 :: Int)
    }
  ((HasCallStack => String -> App ()) -> App ())
-> (HasCallStack => String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
domain -> do
    alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def
    client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
    clientId <- objId client

    runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
      deliveryTag <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
      sendAck ws deliveryTag False

      result <- timeout 2500000 (killWebSocketClient ws)
      when (isNothing result)
        $ assertFailure "Expected web socket timeout"

testConsumeTempEvents :: (HasCallStack) => App ()
testConsumeTempEvents :: HasCallStack => App ()
testConsumeTempEvents = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def

  client0 <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  clientId0 <- objId client0

  lowerCodensity $ do
    ws0 <- createEventsWebSocket alice (Just clientId0)

    -- Ensure there is no race between event for this client being pushed and temp
    -- consumer being created
    lift $ do
      expectAndAckNewClientEvent ws0 clientId0
      assertNoEvent_ ws0

    wsTemp <- createEventsWebSocket alice Nothing

    lift $ do
      client1 <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
      clientId1 <- objId client1

      -- Temp client gets this event as it happens after temp client has started
      -- listening
      void $ expectAndAckNewClientEvent wsTemp clientId1

      -- Client0 should also be notified even if there is a temp client
      void $ expectAndAckNewClientEvent ws0 clientId1

      assertNoEvent_ wsTemp
      assertNoEvent_ ws0
  where
    expectAndAckNewClientEvent :: EventWebSocket -> String -> App ()
    expectAndAckNewClientEvent :: EventWebSocket -> String -> App ()
expectAndAckNewClientEvent EventWebSocket
ws String
cid =
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
cid

        HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

testTemporaryQueuesAreDeletedAfterUse :: (HasCallStack) => App ()
testTemporaryQueuesAreDeletedAfterUse :: HasCallStack => App ()
testTemporaryQueuesAreDeletedAfterUse = do
  [ServiceOverrides] -> ([BackendResource] -> App ()) -> App ()
forall a.
HasCallStack =>
[ServiceOverrides] -> ([BackendResource] -> App a) -> App a
startDynamicBackendsReturnResources [ServiceOverrides
forall a. Default a => a
def] (([BackendResource] -> App ()) -> App ())
-> ([BackendResource] -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \[BackendResource
beResource] -> do
    let domain :: String
domain = BackendResource
beResource.berDomain
    rabbitmqAdmin <- BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
beResource
    [alice, bob] <- createAndConnectUsers [domain, domain]

    -- Create client for alice, so their temp websocket works
    aliceClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
    aliceId <- asString $ alice %. "qualified_id.id"
    aliceClientId <- asString $ aliceClient %. "id"

    let aliceClientQueueName = String
"user-notifications." String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
aliceId String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"." String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
aliceClientId
        aliceClientQueue = Queue {name :: VHost
name = String -> VHost
forall a. IsString a => String -> a
fromString String
aliceClientQueueName, vhost :: VHost
vhost = String -> VHost
forall a. IsString a => String -> a
fromString BackendResource
beResource.berVHost}
        deadNotifsQueue = Queue {name :: VHost
name = String -> VHost
forall a. IsString a => String -> a
fromString String
"dead-user-notifications", vhost :: VHost
vhost = String -> VHost
forall a. IsString a => String -> a
fromString BackendResource
beResource.berVHost}
        cellsEventsQueue = Queue {name :: VHost
name = String -> VHost
forall a. IsString a => String -> a
fromString String
"cells_events", vhost :: VHost
vhost = String -> VHost
forall a. IsString a => String -> a
fromString BackendResource
beResource.berVHost}
        backgroundJobsQueue = Queue {name :: VHost
name = String -> VHost
forall a. IsString a => String -> a
fromString String
"background-jobs", vhost :: VHost
vhost = String -> VHost
forall a. IsString a => String -> a
fromString BackendResource
beResource.berVHost}

    -- Wait for queue for the new client to be created
    eventually $ do
      queuesBeforeWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
      queuesBeforeWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue]

    runCodensity (createEventsWebSocket alice Nothing) $ \EventWebSocket
ws -> do
      handle <- App String
randomHandle
      putHandle bob handle >>= assertSuccess

      queuesDuringWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
      addJSONToFailureContext "queuesDuringWS" queuesDuringWS $ do
        length queuesDuringWS.items `shouldMatchInt` 5

      -- We cannot use 'assertEvent' here because there is a race between the temp
      -- queue being created and rabbitmq fanning out the previous events.
      void $ assertFindsEvent ws $ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.id" App Value -> App String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` Value -> App String
forall a. (HasCallStack, MakesValue a) => a -> App String
objId Value
bob
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle

        HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

    eventually $ do
      queuesAfterWS <- rabbitmqAdmin.listQueuesByVHost (fromString beResource.berVHost) (fromString "") True 100 1
      queuesAfterWS.items `shouldMatchSet` [deadNotifsQueue, cellsEventsQueue, aliceClientQueue, backgroundJobsQueue]

testSendMessageNoReturnToSenderWithConsumableNotificationsProteus :: (HasCallStack) => App ()
testSendMessageNoReturnToSenderWithConsumableNotificationsProteus :: HasCallStack => App ()
testSendMessageNoReturnToSenderWithConsumableNotificationsProteus = do
  (alice, tid, bob : _) <- Domain -> Int -> App (Value, String, [Value])
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> Int -> App (Value, String, [Value])
createTeam Domain
OwnDomain Int
2
  aliceOldClient <- addClient alice def >>= getJSON 201
  aliceClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  aliceClientId <- objId aliceClient
  bobClient <- addClient bob def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  bobClientId <- objId bobClient
  conv <- postConversation alice defProteus {team = Just tid, qualifiedUsers = [bob]} >>= getJSON 201
  msg <- mkProteusRecipients alice [(bob, [bobClient]), (alice, [aliceOldClient])] "hello, bob"

  let protoMsg =
        forall msg. Message msg => msg
Proto.defMessage @Proto.QualifiedNewOtrMessage
          QualifiedNewOtrMessage
-> (QualifiedNewOtrMessage -> QualifiedNewOtrMessage)
-> QualifiedNewOtrMessage
forall a b. a -> (a -> b) -> b
& (ClientId -> Identity ClientId)
-> QualifiedNewOtrMessage -> Identity QualifiedNewOtrMessage
#sender ((ClientId -> Identity ClientId)
 -> QualifiedNewOtrMessage -> Identity QualifiedNewOtrMessage)
-> ((Word64 -> Identity Word64) -> ClientId -> Identity ClientId)
-> (Word64 -> Identity Word64)
-> QualifiedNewOtrMessage
-> Identity QualifiedNewOtrMessage
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Word64 -> Identity Word64) -> ClientId -> Identity ClientId
forall (f :: * -> *) s a.
(Functor f, HasField s "client" a) =>
LensLike' f s a
Proto.client ((Word64 -> Identity Word64)
 -> QualifiedNewOtrMessage -> Identity QualifiedNewOtrMessage)
-> Word64 -> QualifiedNewOtrMessage -> QualifiedNewOtrMessage
forall s t a b. ASetter s t a b -> b -> s -> t
.~ (String
aliceClientId String -> Getting (Endo Word64) String Word64 -> Word64
forall s a. HasCallStack => s -> Getting (Endo a) s a -> a
^?! Getting (Endo Word64) String Word64
forall a. Integral a => Prism' String a
Prism' String Word64
hex)
          QualifiedNewOtrMessage
-> (QualifiedNewOtrMessage -> QualifiedNewOtrMessage)
-> QualifiedNewOtrMessage
forall a b. a -> (a -> b) -> b
& ASetter
  QualifiedNewOtrMessage
  QualifiedNewOtrMessage
  [QualifiedUserEntry]
  [QualifiedUserEntry]
#recipients ASetter
  QualifiedNewOtrMessage
  QualifiedNewOtrMessage
  [QualifiedUserEntry]
  [QualifiedUserEntry]
-> [QualifiedUserEntry]
-> QualifiedNewOtrMessage
-> QualifiedNewOtrMessage
forall s t a b. ASetter s t a b -> b -> s -> t
.~ [QualifiedUserEntry
msg]
          QualifiedNewOtrMessage
-> (QualifiedNewOtrMessage -> QualifiedNewOtrMessage)
-> QualifiedNewOtrMessage
forall a b. a -> (a -> b) -> b
& ASetter
  QualifiedNewOtrMessage
  QualifiedNewOtrMessage
  ClientMismatchStrategy'ReportAll
  ClientMismatchStrategy'ReportAll
#reportAll ASetter
  QualifiedNewOtrMessage
  QualifiedNewOtrMessage
  ClientMismatchStrategy'ReportAll
  ClientMismatchStrategy'ReportAll
-> ClientMismatchStrategy'ReportAll
-> QualifiedNewOtrMessage
-> QualifiedNewOtrMessage
forall s t a b. ASetter s t a b -> b -> s -> t
.~ ClientMismatchStrategy'ReportAll
forall msg. Message msg => msg
Proto.defMessage
  postProteusMessage alice conv protoMsg >>= assertSuccess

  runCodensity (createEventsWebSocket bob (Just bobClientId)) $ \EventWebSocket
ws -> do
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"conversation.otr-message-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.data.text" App Value -> ByteString -> App ()
forall a. (MakesValue a, HasCallStack) => a -> ByteString -> App ()
`shouldMatchBase64` String -> ByteString
forall a. IsString a => String -> a
fromString String
"hello, bob"
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

  runCodensity (createEventsWebSocket alice (Just aliceClientId)) $ \EventWebSocket
ws -> do
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"conversation.create"
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testEventsForSpecificClients :: (HasCallStack) => App ()
testEventsForSpecificClients :: HasCallStack => App ()
testEventsForSpecificClients = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  uid <- objId alice
  client1 <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  cid1 <- objId client1
  client2 <- addClient alice def >>= getJSON 201
  cid2 <- objId client2

  lowerCodensity $ do
    ws1 <- createEventsWebSocket alice (Just cid1)
    wsTemp <- createEventsWebSocket alice Nothing
    lift $ do
      void $ consumeAllEvents ws1

      let eventForClient1 =
            [Pair] -> Value
object
              [ String
"recipients" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"user_id" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
uid, String
"clients" String -> [String] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [String
cid1], String
"route" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"any"]],
                String
"payload" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"hello" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"client1"]]
              ]
          eventForClient2 =
            [Pair] -> Value
object
              [ String
"recipients" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"user_id" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
uid, String
"clients" String -> [String] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [String
cid2], String
"route" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"any"]],
                String
"payload" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"hello" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"client2"]]
              ]

      GundeckInternal.postPush OwnDomain [eventForClient1, eventForClient2] >>= assertSuccess
      assertEvent ws1 $ \Value
e ->
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.hello" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"client1"

      addFailureContext "client 1 should not get any events meant for client 2"
        $ assertNoEvent_ ws1

      addFailureContext "temp client should not get any events meant solely for client 1 or 2"
        $ assertNoEvent_ wsTemp

testConsumeEventsForDifferentUsers :: (HasCallStack) => App ()
testConsumeEventsForDifferentUsers :: HasCallStack => App ()
testConsumeEventsForDifferentUsers = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  bob <- randomUser OwnDomain def

  aliceClient <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  aliceClientId <- objId aliceClient

  bobClient <- addClient bob def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  bobClientId <- objId bobClient

  lowerCodensity $ do
    aliceWS <- createEventsWebSocket alice (Just aliceClientId)
    bobWS <- createEventsWebSocket bob (Just bobClientId)
    lift $ assertClientAdd aliceClientId aliceWS
    lift $ assertClientAdd bobClientId bobWS
  where
    assertClientAdd :: (HasCallStack) => String -> EventWebSocket -> App ()
    assertClientAdd :: HasCallStack => String -> EventWebSocket -> App ()
assertClientAdd String
clientId EventWebSocket
ws = do
      deliveryTag <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
      assertNoEvent_ ws
      sendAck ws deliveryTag False

testConsumeEventsWhileHavingLegacyClients :: (HasCallStack) => App ()
testConsumeEventsWhileHavingLegacyClients :: HasCallStack => App ()
testConsumeEventsWhileHavingLegacyClients = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def

  -- Even if alice has no clients, the notifications should still be persisted
  -- in Cassandra. This choice is kinda arbitrary as these notifications
  -- probably don't mean much, however, it ensures backwards compatibility.
  lastNotifId <-
    awaitNotification alice noValue (const $ pure True) >>= \Value
notif -> do
      Value
notif Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.activate"
      -- There is only one notification (at the time of writing), so we assume
      -- it to be the last one.
      Value
notif Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"id" App Value -> (App Value -> App String) -> App String
forall a b. a -> (a -> b) -> b
& App Value -> App String
forall a. (HasCallStack, MakesValue a) => a -> App String
asString

  oldClient <- addClient alice def {acapabilities = Just []} >>= getJSON 201

  withWebSocket (alice, "anything-but-conn", oldClient %. "id") $ \WebSocket
oldWS -> do
    newClient <- Value -> AddClient -> App Response
forall user.
(HasCallStack, MakesValue user) =>
user -> AddClient -> App Response
addClient Value
alice AddClient
forall a. Default a => a
def {acapabilities = Just ["consumable-notifications"]} App Response -> (Response -> App Value) -> App Value
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Int -> Response -> App Value
Int -> Response -> App Value
getJSON Int
201
    newClientId <- newClient %. "id" & asString

    oldNotif <- awaitMatch isUserClientAddNotif oldWS
    oldNotif %. "payload.0.client.id" `shouldMatch` newClientId

    runCodensity (createEventsWebSocket alice (Just newClientId)) $ \EventWebSocket
ws -> do
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
newClientId

  -- All notifs are also in Cassandra because of the legacy client
  getNotifications alice def {since = Just lastNotifId} `bindResponse` \Response
resp -> do
    Response
resp.status Int -> Int -> App ()
forall a. (MakesValue a, HasCallStack) => a -> Int -> App ()
`shouldMatchInt` Int
200
    Response
resp.json App Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"notifications.0.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
    Response
resp.json App Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"notifications.1.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"

testConsumeEventsAcks :: (HasCallStack) => App ()
testConsumeEventsAcks :: HasCallStack => App ()
testConsumeEventsAcks = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  clientId <- objId client

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId

  -- without ack, we receive the same event again
  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    deliveryTag <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
    sendAck ws deliveryTag False

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testConsumeEventsMultipleAcks :: (HasCallStack) => App ()
testConsumeEventsMultipleAcks :: HasCallStack => App ()
testConsumeEventsMultipleAcks = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  clientId <- objId client

  handle <- randomHandle
  putHandle alice handle >>= assertSuccess

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId

    deliveryTag <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"

    sendAck ws deliveryTag True

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testConsumeEventsAckNewEventWithoutAckingOldOne :: (HasCallStack) => App ()
testConsumeEventsAckNewEventWithoutAckingOldOne :: HasCallStack => App ()
testConsumeEventsAckNewEventWithoutAckingOldOne = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  clientId <- objId client

  handle <- randomHandle
  putHandle alice handle >>= assertSuccess

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId

    deliveryTagHandleAdd <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"

    -- Only ack the handle add delivery tag
    sendAck ws deliveryTagHandleAdd False

  -- Expect client-add event to be delivered again.
  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    deliveryTagClientAdd <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App Value) -> App Value)
-> (HasCallStack => Value -> App Value) -> App Value
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"

    sendAck ws deliveryTagClientAdd False

  runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testEventsDeadLettered :: (HasCallStack) => App ()
testEventsDeadLettered :: HasCallStack => App ()
testEventsDeadLettered = do
  let notifTTL :: Timeout
notifTTL = Word64
1 Word64 -> TimeoutUnit -> Timeout
# TimeoutUnit
Second
  ServiceOverrides -> (HasCallStack => String -> App ()) -> App ()
forall a.
HasCallStack =>
ServiceOverrides -> (HasCallStack => String -> App a) -> App a
withModifiedBackend (ServiceOverrides
forall a. Default a => a
def {gundeckCfg = setField "settings.notificationTTL" (notifTTL #> Second)}) ((HasCallStack => String -> App ()) -> App ())
-> (HasCallStack => String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
domain -> do
    alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def

    -- This generates an event
    client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
    clientId <- objId client

    -- We expire the add client event by waiting it out
    Timeout.threadDelay (notifTTL + 500 # MilliSecond)

    -- Generate a second event
    handle1 <- randomHandle
    putHandle alice handle1 >>= assertSuccess

    runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"notifications_missed"

      -- Until we ack the full sync, we can't get new events
      HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
ackFullSync EventWebSocket
ws

      -- withEventsWebSocket alice clientId $ \eventsChan ackChan -> do
      -- Now we can see the next event
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle1
        HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

      -- We've consumed the whole queue.
      HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testEventsDeadLetteredWithReconnect :: (HasCallStack) => App ()
testEventsDeadLetteredWithReconnect :: HasCallStack => App ()
testEventsDeadLetteredWithReconnect = do
  let notifTTL :: Timeout
notifTTL = Word64
1 Word64 -> TimeoutUnit -> Timeout
# TimeoutUnit
Second
  [ServiceOverrides] -> ([BackendResource] -> App ()) -> App ()
forall a.
HasCallStack =>
[ServiceOverrides] -> ([BackendResource] -> App a) -> App a
startDynamicBackendsReturnResources [ServiceOverrides
forall a. Default a => a
def {gundeckCfg = setField "settings.notificationTTL" (notifTTL #> Second)}] (([BackendResource] -> App ()) -> App ())
-> ([BackendResource] -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \[BackendResource
resources] -> do
    let String
domain :: String = BackendResource
resources.berDomain
    alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def

    -- This generates an event
    client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
    clientId <- objId client

    -- Force a reconnect by deleting the existing connection
    killAllDeadUserNotificationRabbitMqConns resources

    -- We expire the add client event by waiting it out
    Timeout.threadDelay (notifTTL + 500 # MilliSecond)

    -- Generate a second event
    handle1 <- randomHandle
    putHandle alice handle1 >>= assertSuccess

    runCodensity (createEventsWebSocketWithSync alice (Just clientId)) $ \(String
endMarker, EventWebSocket
ws) -> do
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"notifications_missed"

      -- Until we ack the full sync, we can't get new events
      HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
ackFullSync EventWebSocket
ws

      -- Now we can see the next event
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle1
        HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

      -- We've consumed the whole queue.
      HasCallStack => EventWebSocket -> String -> App ()
EventWebSocket -> String -> App ()
assertEndOfIniitalSync EventWebSocket
ws String
endMarker
  where
    killAllDeadUserNotificationRabbitMqConns :: (HasCallStack) => BackendResource -> App ()
    killAllDeadUserNotificationRabbitMqConns :: HasCallStack => BackendResource -> App ()
killAllDeadUserNotificationRabbitMqConns BackendResource
backend = do
      rabbitmqAdminClient <- BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
backend
      connections <- eventually $ do
        conns <- getDeadUserNotificationConnections rabbitmqAdminClient backend.berVHost
        assertAtLeastOne conns
        pure conns
      for_ connections $ \Connection
connection -> do
        AdminAPI (AsClientT App)
rabbitmqAdminClient.deleteConnection Connection
connection.name

    getDeadUserNotificationConnections :: (HasCallStack) => AdminAPI (AsClientT App) -> String -> App [Connection]
    getDeadUserNotificationConnections :: HasCallStack =>
AdminAPI (AsClientT App) -> String -> App [Connection]
getDeadUserNotificationConnections AdminAPI (AsClientT App)
rabbitmqAdminClient String
vhost = do
      connections <- AdminAPI (AsClientT App)
rabbitmqAdminClient.listConnectionsByVHost (String -> VHost
Text.pack String
vhost)
      pure $ filter (\Connection
c -> VHost -> Maybe VHost
forall a. a -> Maybe a
Just (String -> VHost
forall a. IsString a => String -> a
fromString String
"dead-user-notifications-watcher") Maybe VHost -> Maybe VHost -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
c.userProvidedName) connections

testTransientEventsDoNotTriggerDeadLetters :: (HasCallStack) => App ()
testTransientEventsDoNotTriggerDeadLetters :: HasCallStack => App ()
testTransientEventsDoNotTriggerDeadLetters = do
  let notifTTL :: Timeout
notifTTL = Word64
1 Word64 -> TimeoutUnit -> Timeout
# TimeoutUnit
Second
  ServiceOverrides -> (HasCallStack => String -> App ()) -> App ()
forall a.
HasCallStack =>
ServiceOverrides -> (HasCallStack => String -> App a) -> App a
withModifiedBackend (ServiceOverrides
forall a. Default a => a
def {gundeckCfg = setField "settings.notificationTTL" (notifTTL #> Second)}) ((HasCallStack => String -> App ()) -> App ())
-> (HasCallStack => String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
domain -> do
    alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def
    -- Creates a non-transient event
    client <- addClient alice def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
    clientId <- objId client

    -- consume it
    runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
        deliveryTag <- Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
        sendAck ws deliveryTag False

    -- Self conv ID is same as user's ID, we'll use this to send typing
    -- indicators, so we don't have to create another conv.
    selfConvId <- objQidObject alice
    -- Typing status is transient, currently no one is listening.
    sendTypingStatus alice selfConvId "started" >>= assertSuccess

    runCodensity (createEventsWebSocket alice (Just clientId)) $ \EventWebSocket
ws -> do
      HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testTransientEvents :: (HasCallStack) => App ()
testTransientEvents :: HasCallStack => App ()
testTransientEvents = do
  (alice, _, _) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  (bob, _, bobClient) <- mkUserPlusClient
  connectTwoUsers alice bob
  bobClientId <- objId bobClient

  conv <- postConversation alice defProteus {qualifiedUsers = [bob]} >>= getJSON 201

  runCodensity (createEventsWebSocketWithSync bob (Just bobClientId)) $ \(String
marker, EventWebSocket
bobWs) -> do
    App [Value] -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App [Value] -> App ()) -> App [Value] -> App ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
bobWs String
marker

    Value -> Value -> String -> App Response
forall user conv.
(HasCallStack, MakesValue user, MakesValue conv) =>
user -> conv -> String -> App Response
sendTypingStatus Value
alice Value
conv String
"started" App Response -> (Response -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Response -> App ()
Response -> App ()
assertSuccess

    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
bobWs ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"conversation.typing"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.qualified_conversation" App Value -> App Value -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` (Value
conv Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"qualified_id")
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
bobWs Value
e

  handle1 <- randomHandle
  putHandle bob handle1 >>= assertSuccess

  sendTypingStatus alice conv "stopped" >>= assertSuccess

  handle2 <- randomHandle
  putHandle bob handle2 >>= assertSuccess

  -- We shouldn't see the stopped typing status because we were not connected to
  -- the websocket when it was sent. The other events should still show up in
  -- order.
  runCodensity (createEventsWebSocket bob (Just bobClient)) $ \EventWebSocket
ws -> do
    [String] -> (String -> App ()) -> App ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [String
handle1, String
handle2] ((String -> App ()) -> App ()) -> (String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
handle ->
      EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.update"
        Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.user.handle" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
handle
        HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
ws

testChannelLimit :: (HasCallStack) => App ()
testChannelLimit :: HasCallStack => App ()
testChannelLimit = ServiceOverrides -> (HasCallStack => String -> App ()) -> App ()
forall a.
HasCallStack =>
ServiceOverrides -> (HasCallStack => String -> App a) -> App a
withModifiedBackend
  ( ServiceOverrides
forall a. Default a => a
def
      { cannonCfg =
          setField "rabbitMqMaxChannels" (2 :: Int)
            >=> setField "rabbitMqMaxConnections" (1 :: Int)
      }
  )
  ((HasCallStack => String -> App ()) -> App ())
-> (HasCallStack => String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
domain -> do
    alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def
    (client0 : clients) <-
      replicateM 3
        $ addClient alice def {acapabilities = Just ["consumable-notifications"]}
        >>= getJSON 201
        >>= (%. "id")
        >>= asString

    lowerCodensity $ do
      for_ clients $ \String
c -> do
        ws <- Value -> Maybe String -> Codensity App EventWebSocket
forall user.
(HasCallStack, MakesValue user) =>
user -> Maybe String -> Codensity App EventWebSocket
createEventsWebSocket Value
alice (String -> Maybe String
forall a. a -> Maybe a
Just String
c)
        lift $ assertEvent ws $ \Value
e -> do
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
c

      -- the first client fails to connect because the server runs out of channels
      do
        eithWS <- createEventsWebSocketEither alice (Just client0) Nothing
        case eithWS of
          Left (WS.MalformedResponse ResponseHead
respHead String
_) ->
            App () -> Codensity App ()
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App () -> Codensity App ()) -> App () -> Codensity App ()
forall a b. (a -> b) -> a -> b
$ ResponseHead
respHead.responseCode Int -> Int -> App ()
forall a. (MakesValue a, HasCallStack) => a -> Int -> App ()
`shouldMatchInt` Int
503
          Left HandshakeException
e ->
            App () -> Codensity App ()
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App () -> Codensity App ()) -> App () -> Codensity App ()
forall a b. (a -> b) -> a -> b
$ String -> App ()
forall a. HasCallStack => String -> App a
assertFailure (String -> App ()) -> String -> App ()
forall a b. (a -> b) -> a -> b
$ String
"Expected websocket to fail with response code 503, got some other handshake exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> HandshakeException -> String
forall e. Exception e => e -> String
displayException HandshakeException
e
          Right EventWebSocket
_ -> App () -> Codensity App ()
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App () -> Codensity App ()) -> App () -> Codensity App ()
forall a b. (a -> b) -> a -> b
$ String -> App ()
forall a. HasCallStack => String -> App a
assertFailure String
"Expected websocket hanshake to fail, but it didn't"

testChannelKilled :: (HasCallStack) => App ()
testChannelKilled :: HasCallStack => App ()
testChannelKilled = do
  pool <- (Env -> ResourcePool BackendResource)
-> App (ResourcePool BackendResource)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.resourcePool)
  runCodensity (acquireResources 1 pool) $ \[BackendResource
backend] -> do
    -- Some times RabbitMQ still remembers connections from previous uses of the
    -- dynamic backend. So we wait to ensure that we kill connection only for our
    -- current.
    App [Connection] -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App [Connection] -> App ()) -> App [Connection] -> App ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => BackendResource -> App [Connection]
BackendResource -> App [Connection]
killAllRabbitMqConns BackendResource
backend
    HasCallStack => BackendResource -> App ()
BackendResource -> App ()
waitUntilNoRabbitMqConns BackendResource
backend

    Codensity App String -> forall b. (String -> App b) -> App b
forall k (m :: k -> *) a.
Codensity m a -> forall (b :: k). (a -> m b) -> m b
runCodensity (HasCallStack =>
BackendResource -> ServiceOverrides -> Codensity App String
BackendResource -> ServiceOverrides -> Codensity App String
startDynamicBackend BackendResource
backend ServiceOverrides
forall a. Default a => a
def) ((String -> App ()) -> App ()) -> (String -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \String
_ -> do
      let domain :: String
domain = BackendResource
backend.berDomain
      alice <- String -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser String
domain CreateUser
forall a. Default a => a
def
      [c1, c2] <-
        replicateM 2
          $ addClient alice def {acapabilities = Just ["consumable-notifications"]}
          >>= getJSON 201
          >>= (%. "id")
          >>= asString

      runCodensity (createEventsWebSocket alice (Just c1)) $ \EventWebSocket
ws -> do
        -- If creating the user takes longer (async) than adding the clients, we get a
        -- `"user.activate"` here, so we use `assertFindsEvent`.
        EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
c1
          HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

        EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
          Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
c2
          HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e

        -- The RabbitMQ admin API takes some time to see new connections, so we need
        -- to try a few times.
        RetryPolicyM App -> (RetryStatus -> App ()) -> App ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll (Int -> RetryPolicyM App
forall (m :: * -> *). Monad m => Int -> RetryPolicyM m
constantDelay Int
500_000 RetryPolicyM App -> RetryPolicyM App -> RetryPolicyM App
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicy
limitRetries Int
10) ((RetryStatus -> App ()) -> App ())
-> (RetryStatus -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \RetryStatus
_ -> do
          conns <- HasCallStack => BackendResource -> App [Connection]
BackendResource -> App [Connection]
killAllRabbitMqConns BackendResource
backend
          assertAtLeastOne conns

        HasCallStack => BackendResource -> App ()
BackendResource -> App ()
waitUntilNoRabbitMqConns BackendResource
backend

        HasCallStack => EventWebSocket -> App NoEvent
EventWebSocket -> App NoEvent
assertNoEventHelper EventWebSocket
ws App NoEvent -> NoEvent -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` NoEvent
WebSocketDied

testSingleConsumer :: (HasCallStack) => App ()
testSingleConsumer :: HasCallStack => App ()
testSingleConsumer = do
  alice <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def
  clientId <-
    addClient alice def {acapabilities = Just ["consumable-notifications"]}
      >>= getJSON 201
      >>= objId

  -- add a second client in order to generate one more notification
  clientId' <- addClient alice def >>= getJSON 201 >>= objId

  lowerCodensity $ do
    ws <- createEventsWebSocket alice (Just clientId)
    ws' <- createEventsWebSocket alice (Just clientId)

    -- the second websocket should get no notifications as long as the first
    -- one is connected
    lift $ assertNoEvent_ ws'

    deliveryTag1 <- lift $ assertEvent ws $ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"

    lift $ assertNoEvent_ ws'

    lift $ sendAck ws deliveryTag1 False
    lift $ assertNoEvent_ ws'

    deliveryTag2 <- lift $ assertEvent ws $ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"event"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"user.client-add"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.client.id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
clientId'
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
    lift $ sendAck ws deliveryTag2 False

    lift $ assertNoEvent_ ws'

testPrefetchCount :: (HasCallStack) => App ()
testPrefetchCount :: HasCallStack => App ()
testPrefetchCount = do
  (alice, uid, cid) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  emptyQueue alice cid

  for_ [1 :: Int .. 550] $ \Int
i ->
    do
      let event :: Value
event =
            [Pair] -> Value
object
              [ String
"recipients" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"user_id" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
uid, String
"clients" String -> [String] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [String
cid], String
"route" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"any"]],
                String
"payload" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"no" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= Int -> String
forall a. Show a => a -> String
show Int
i]]
              ]
      Domain -> [Value] -> App Response
forall user a.
(HasCallStack, MakesValue user, MakesValue a) =>
user -> [a] -> App Response
GundeckInternal.postPush Domain
OwnDomain [Value
event] App Response -> (Response -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Response -> App ()
Response -> App ()
assertSuccess
  runCodensity (createEventsWebSocketWithSync alice (Just cid)) \(String
endMarker, EventWebSocket
ws) -> do
    es <- EventWebSocket -> App [Value]
consumeAllEventsNoAck EventWebSocket
ws
    assertBool ("First 500 events expected, got " ++ show (length es)) $ length es == 500

    forM_ es (ackEvent ws)

    es' <- consumeEventsUntilEndOfInitialSync ws endMarker
    assertBool "Receive at least one outstanding event" $ not (null es')

testEndOfInitialSync :: (HasCallStack) => App ()
testEndOfInitialSync :: HasCallStack => App ()
testEndOfInitialSync = do
  (alice, uid, cid) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  let n = Int
20
  replicateM_ n $ do
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess

  -- marker0 <- randomId
  runCodensity (createEventsWebSocketWithSync alice (Just cid)) \(String
endMarker, EventWebSocket
ws) -> do
    preExistingEvents <- HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
endMarker
    otherEvents <- consumeAllEvents ws

    -- we expect one user.client-add event, n more events, and one sync event
    length (preExistingEvents <> otherEvents) `shouldMatchInt` (n + 2)

    -- more events should not be followed by the sync event
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess
    assertEvent ws $ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"test"
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
    assertNoEvent_ ws

  -- when we reconnect, there are no messages but we still get the
  -- synchronization notification.
  runCodensity (createEventsWebSocketWithSync alice (Just cid)) \(String
endMarker, EventWebSocket
ws) -> do
    preExistingEvents <- HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
endMarker
    otherEvents <- consumeAllEvents ws
    let events = [Value]
preExistingEvents [Value] -> [Value] -> [Value]
forall a. Semigroup a => a -> a -> a
<> [Value]
otherEvents
    length events `shouldMatchInt` 1

    -- more events should not be followed by synchronization event
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess
    assertEvent ws $ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"test"
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
    assertNoEvent_ ws

testEndOfInitialSyncMoreEventsAfterSyncMessage :: (HasCallStack) => App ()
testEndOfInitialSyncMoreEventsAfterSyncMessage :: HasCallStack => App ()
testEndOfInitialSyncMoreEventsAfterSyncMessage = do
  (alice, uid, cid) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  let n = Int
20
  replicateM_ n $ do
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess

  runCodensity (createEventsWebSocketWithSync alice (Just cid)) \(String
endMarker, EventWebSocket
ws) -> do
    -- it seems this is needed to reduce flakiness,
    -- when the messages below are pushed faster than the sync message is inserted
    Timeout -> App ()
forall (μ :: * -> *). MonadBase IO μ => Timeout -> μ ()
Timeout.threadDelay (Word64
1 Word64 -> TimeoutUnit -> Timeout
# TimeoutUnit
Second)

    -- before consuming, we push n more events
    Int -> App () -> App ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (App () -> App ()) -> App () -> App ()
forall a b. (a -> b) -> a -> b
$ do
      Domain -> [Value] -> App Response
forall user a.
(HasCallStack, MakesValue user, MakesValue a) =>
user -> [a] -> App Response
GundeckInternal.postPush Domain
OwnDomain [String -> String -> Bool -> Value
forall a1 a2. (ToJSON a1, ToJSON a2) => a1 -> a2 -> Bool -> Value
mkEvent String
uid String
cid Bool
False] App Response -> (Response -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Response -> App ()
Response -> App ()
assertSuccess

    preExistingEvents <- HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
endMarker
    otherEvents <- consumeAllEvents ws

    length (preExistingEvents <> otherEvents) `shouldMatchInt` (n + n + 2)
    addFailureContext ("We should have received " <> show n <> " more events after the sync event but got " <> show (length otherEvents))
      $ (length otherEvents >= n)
      `shouldMatch` True

testEndOfInitialSyncIgnoreExpired :: (HasCallStack) => App ()
testEndOfInitialSyncIgnoreExpired :: HasCallStack => App ()
testEndOfInitialSyncIgnoreExpired = do
  (alice, uid, cid) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  let n = Int
20
  replicateM_ n $ do
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess

  replicateM_ n $ do
    GundeckInternal.postPush OwnDomain [mkEvent uid cid True] >>= assertSuccess

  -- Wait for transient events to expire
  Timeout.threadDelay (1 # Second)

  runCodensity (createEventsWebSocketWithSync alice (Just cid)) $ \(String
endMarker, EventWebSocket
ws) -> do
    preExistingEvents <- HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
endMarker
    otherEvents <- consumeAllEvents ws
    let events = [Value]
preExistingEvents [Value] -> [Value] -> [Value]
forall a. Semigroup a => a -> a -> a
<> [Value]
otherEvents
    length events `shouldMatchInt` (n + 2) -- +1 for the sync event, +1 for the client add event

testEndOfInitialSyncAckMultiple :: (HasCallStack) => App ()
testEndOfInitialSyncAckMultiple :: HasCallStack => App ()
testEndOfInitialSyncAckMultiple = do
  (alice, uid, cid) <- App (Value, String, String)
HasCallStack => App (Value, String, String)
mkUserPlusClient
  let n = Int
20
  replicateM_ n $ do
    GundeckInternal.postPush OwnDomain [mkEvent uid cid False] >>= assertSuccess

  runCodensity (createEventsWebSocketWithSync alice (Just cid)) $ \(String
endMarker, EventWebSocket
ws) -> do
    App Value -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App Value -> App ()) -> App Value -> App ()
forall a b. (a -> b) -> a -> b
$ EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws HasCallStack => Value -> App Value
Value -> App Value
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    e <- EventWebSocket -> (HasCallStack => Value -> App Value) -> App Value
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws HasCallStack => Value -> App Value
Value -> App Value
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    dt <- e %. "data.delivery_tag"
    -- we ack the first 2 events with one ack
    sendAck ws dt True
    let expectedNumEvents = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2 -- +1 for the sync event, +1 for the client add event
    preExistingEvents <- consumeEventsUntilEndOfInitialSync ws endMarker
    otherEvents <- consumeAllEvents ws
    let events = [Value]
preExistingEvents [Value] -> [Value] -> [Value]
forall a. Semigroup a => a -> a -> a
<> [Value]
otherEvents
    length events `shouldMatchInt` expectedNumEvents

mkEvent :: (ToJSON a1, ToJSON a2) => a1 -> a2 -> Bool -> Value
mkEvent :: forall a1 a2. (ToJSON a1, ToJSON a2) => a1 -> a2 -> Bool -> Value
mkEvent a1
uid a2
cid Bool
transient =
  [Pair] -> Value
object
    [ String
"recipients" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"user_id" String -> a1 -> Pair
forall a. ToJSON a => String -> a -> Pair
.= a1
uid, String
"clients" String -> [a2] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [a2
cid], String
"route" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"any"]],
      String
"payload" String -> [Value] -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [[Pair] -> Value
object [String
"hello" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"world", String
"type" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"test"]],
      String
"transient" String -> Bool -> Pair
forall a. ToJSON a => String -> a -> Pair
.= Bool
transient
    ]

testTypingIndicatorIsNotSentToOwnClient :: (HasCallStack) => TaggedBool "federated" -> App ()
testTypingIndicatorIsNotSentToOwnClient :: HasCallStack => TaggedBool "federated" -> App ()
testTypingIndicatorIsNotSentToOwnClient (TaggedBool Bool
federated) = do
  (alice, _, aliceClient) <- Domain -> App (Value, String, String)
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> App (Value, String, String)
mkUserPlusClientWithDomain Domain
OwnDomain
  (bob, _, bobClient) <- mkUserPlusClientWithDomain (if federated then OtherDomain else OwnDomain)
  connectTwoUsers alice bob
  aliceClientId <- objId aliceClient
  bobClientId <- objId bobClient
  conv <- postConversation alice defProteus {qualifiedUsers = [bob]} >>= getJSON 201

  runCodensity (createEventWebSockets [(alice, Just aliceClientId), (bob, Just bobClientId)]) $ \[EventWebSocket
aliceWs, EventWebSocket
bobWs] -> do
    -- consume all events to ensure we start with a clean slate
    EventWebSocket -> App ()
consumeAllEvents_ EventWebSocket
aliceWs
    EventWebSocket -> App ()
consumeAllEvents_ EventWebSocket
bobWs

    -- Alice is typing
    Value -> Value -> String -> App Response
forall user conv.
(HasCallStack, MakesValue user, MakesValue conv) =>
user -> conv -> String -> App Response
sendTypingStatus Value
alice Value
conv String
"started" App Response -> (Response -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Response -> App ()
Response -> App ()
assertSuccess

    -- Bob should receive the typing indicator for Alice
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
bobWs ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"conversation.typing"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.qualified_conversation" App Value -> App Value -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` (Value
conv Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"qualified_id")
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.qualified_from" App Value -> App Value -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` (Value
alice Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"qualified_id")
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
bobWs Value
e

    -- Alice should not receive the typing indicator for herself
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
aliceWs

    -- Bob is typing
    Value -> Value -> String -> App Response
forall user conv.
(HasCallStack, MakesValue user, MakesValue conv) =>
user -> conv -> String -> App Response
sendTypingStatus Value
bob Value
conv String
"started" App Response -> (Response -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => Response -> App ()
Response -> App ()
assertSuccess

    -- Alice should receive the typing indicator for Bob
    EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
aliceWs ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"conversation.typing"
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.qualified_conversation" App Value -> App Value -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` (Value
conv Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"qualified_id")
      Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.event.payload.0.qualified_from" App Value -> App Value -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` (Value
bob Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"qualified_id")
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
aliceWs Value
e

    -- Bob should not receive the typing indicator for himself
    HasCallStack => EventWebSocket -> App ()
EventWebSocket -> App ()
assertNoEvent_ EventWebSocket
bobWs

-- We only delete queues to clean up federated integration tests. So, we
-- mostly want to ensure we don't get stuck there.
testBackendPusherRecoversFromQueueDeletion :: (HasCallStack) => App ()
testBackendPusherRecoversFromQueueDeletion :: HasCallStack => App ()
testBackendPusherRecoversFromQueueDeletion = do
  bob <- Domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser Domain
OwnDomain CreateUser
forall a. Default a => a
def

  domain1 <- asks (.domain1)

  let remotesRefreshInterval = Int
10000 :: Int
  startDynamicBackendsReturnResources
    [ def
        { backgroundWorkerCfg =
            setField
              "backendNotificationPusher.remotesRefreshInterval"
              remotesRefreshInterval
        }
    ]
    $ \[BackendResource
beResource] -> do
      let domain :: String
domain = BackendResource
beResource.berDomain
      (alice, team, [alex, alison]) <- String -> Int -> App (Value, String, [Value])
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> Int -> App (Value, String, [Value])
createTeam String
domain Int
3

      -- Create a federated conversion
      connectTwoUsers alice bob
      [alexId, alisonId, bobId] <-
        forM [alex, alison, bob] (%. "qualified_id")
      let nc = (CreateConv
defProteus {qualifiedUsers = [alexId, alisonId, bobId], team = Just team})
      void $ postConversation alice nc >>= getJSON 201

      withWebSockets [bob] $ \[WebSocket
wsBob] -> do
        rabbitmqAdminClient <- BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
beResource
        let getActiveQueues :: App [String] =
              Text.unpack . (.name)
                <$$> ( (.items)
                         <$> rabbitmqAdminClient.listQueuesByVHost
                           (fromString beResource.berVHost)
                           (fromString "")
                           True
                           100
                           1
                     )

        void $ deleteTeamMember team alice alex >>= getBody 202

        assertConvUserDeletedNotif wsBob alexId

        -- Delete the queue
        let backendNotificationQueueName = String
"backend-notifications." String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
domain1
        void
          $ rabbitmqAdminClient.deleteQueue
            (fromString beResource.berVHost)
            (fromString backendNotificationQueueName)

        -- Ensure the queue was deleted
        eventually $ do
          queueNames <- getActiveQueues
          queueNames `shouldNotContain` [backendNotificationQueueName]

        void $ deleteTeamMember team alice alison >>= getBody 202

        Timeout.threadDelay . fromIntegral $ remotesRefreshInterval * 2

        -- Check that the queue was recreated
        eventually $ do
          queueNames <- getActiveQueues
          queueNames `shouldContain` [backendNotificationQueueName]

        assertConvUserDeletedNotif wsBob alisonId

----------------------------------------------------------------------
-- helpers
mkUserPlusClientWithDomain :: (HasCallStack, MakesValue domain) => domain -> App (Value, String, String)
mkUserPlusClientWithDomain :: forall domain.
(HasCallStack, MakesValue domain) =>
domain -> App (Value, String, String)
mkUserPlusClientWithDomain domain
domain = do
  user <- domain -> CreateUser -> App Value
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> CreateUser -> App Value
randomUser domain
domain CreateUser
forall a. Default a => a
def
  uid <- objId user
  client <- addClient user def {acapabilities = Just ["consumable-notifications"]} >>= getJSON 201
  cid <- objId client
  pure (user, uid, cid)

mkUserPlusClient :: (HasCallStack) => App (Value, String, String)
mkUserPlusClient :: HasCallStack => App (Value, String, String)
mkUserPlusClient = Domain -> App (Value, String, String)
forall domain.
(HasCallStack, MakesValue domain) =>
domain -> App (Value, String, String)
mkUserPlusClientWithDomain Domain
OwnDomain

data EventWebSocket = EventWebSocket
  { EventWebSocket -> Chan (Either ConnectionException Value)
events :: Chan (Either WS.ConnectionException Value),
    EventWebSocket -> MVar (Maybe Value)
ack :: MVar (Maybe Value),
    EventWebSocket -> MVar ()
kill :: MVar (),
    EventWebSocket -> MVar ()
done :: MVar ()
  }

createEventWebSockets ::
  (HasCallStack, MakesValue user) =>
  [(user, Maybe String)] ->
  Codensity App [EventWebSocket]
createEventWebSockets :: forall user.
(HasCallStack, MakesValue user) =>
[(user, Maybe String)] -> Codensity App [EventWebSocket]
createEventWebSockets = ((user, Maybe String) -> Codensity App EventWebSocket)
-> [(user, Maybe String)] -> Codensity App [EventWebSocket]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ((user -> Maybe String -> Codensity App EventWebSocket)
-> (user, Maybe String) -> Codensity App EventWebSocket
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry user -> Maybe String -> Codensity App EventWebSocket
forall user.
(HasCallStack, MakesValue user) =>
user -> Maybe String -> Codensity App EventWebSocket
createEventsWebSocket)

createEventsWebSocket ::
  (HasCallStack, MakesValue user) =>
  user ->
  Maybe String ->
  Codensity App EventWebSocket
createEventsWebSocket :: forall user.
(HasCallStack, MakesValue user) =>
user -> Maybe String -> Codensity App EventWebSocket
createEventsWebSocket user
user Maybe String
cid = do
  eithWS <- user
-> Maybe String
-> Maybe String
-> Codensity App (Either HandshakeException EventWebSocket)
forall user.
(HasCallStack, MakesValue user) =>
user
-> Maybe String
-> Maybe String
-> Codensity App (Either HandshakeException EventWebSocket)
createEventsWebSocketEither user
user Maybe String
cid Maybe String
forall a. Maybe a
Nothing
  case eithWS of
    Left HandshakeException
e -> App EventWebSocket -> Codensity App EventWebSocket
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App EventWebSocket -> Codensity App EventWebSocket)
-> App EventWebSocket -> Codensity App EventWebSocket
forall a b. (a -> b) -> a -> b
$ String -> App EventWebSocket
forall a. HasCallStack => String -> App a
assertFailure (String -> App EventWebSocket) -> String -> App EventWebSocket
forall a b. (a -> b) -> a -> b
$ String
"Websocket failed to connect due to handshake exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> HandshakeException -> String
forall e. Exception e => e -> String
displayException HandshakeException
e
    Right EventWebSocket
ws -> EventWebSocket -> Codensity App EventWebSocket
forall a. a -> Codensity App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure EventWebSocket
ws

createEventsWebSocketWithSync ::
  (HasCallStack, MakesValue user) =>
  user ->
  Maybe String ->
  Codensity App (String, EventWebSocket)
createEventsWebSocketWithSync :: forall user.
(HasCallStack, MakesValue user) =>
user -> Maybe String -> Codensity App (String, EventWebSocket)
createEventsWebSocketWithSync user
user Maybe String
cid = do
  syncMarker <- App String -> Codensity App String
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift App String
HasCallStack => App String
randomId
  eithWS <- createEventsWebSocketEither user cid (Just syncMarker)
  case eithWS of
    Left HandshakeException
e -> App (String, EventWebSocket)
-> Codensity App (String, EventWebSocket)
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App (String, EventWebSocket)
 -> Codensity App (String, EventWebSocket))
-> App (String, EventWebSocket)
-> Codensity App (String, EventWebSocket)
forall a b. (a -> b) -> a -> b
$ String -> App (String, EventWebSocket)
forall a. HasCallStack => String -> App a
assertFailure (String -> App (String, EventWebSocket))
-> String -> App (String, EventWebSocket)
forall a b. (a -> b) -> a -> b
$ String
"Websocket failed to connect due to handshake exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> HandshakeException -> String
forall e. Exception e => e -> String
displayException HandshakeException
e
    Right EventWebSocket
ws -> (String, EventWebSocket) -> Codensity App (String, EventWebSocket)
forall a. a -> Codensity App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String
syncMarker, EventWebSocket
ws)

createEventsWebSocketEither ::
  (HasCallStack, MakesValue user) =>
  user ->
  Maybe String ->
  Maybe String ->
  Codensity App (Either WS.HandshakeException EventWebSocket)
createEventsWebSocketEither :: forall user.
(HasCallStack, MakesValue user) =>
user
-> Maybe String
-> Maybe String
-> Codensity App (Either HandshakeException EventWebSocket)
createEventsWebSocketEither user
user Maybe String
cid Maybe String
mSyncMarker = do
  eventsChan <- IO (Chan (Either ConnectionException Value))
-> Codensity App (Chan (Either ConnectionException Value))
forall a. IO a -> Codensity App a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (Either ConnectionException Value))
forall (m :: * -> *) a. MonadIO m => m (Chan a)
newChan
  ackChan <- liftIO newEmptyMVar
  serviceMap <- lift $ getServiceMap =<< objDomain user
  apiVersion <- lift $ getAPIVersionFor $ objDomain user
  wsStarted <- newEmptyMVar
  let minAPIVersion = Int
8
  lift
    . when (apiVersion < minAPIVersion)
    $ assertFailure ("Events websocket can only be created when APIVersion is at least " <> show minAPIVersion)

  varKill <- lift $ newEmptyMVar
  varDone <- lift $ newEmptyMVar

  uid <- lift $ objId =<< objQidObject user
  let HostPort caHost caPort = serviceHostPort serviceMap Cannon
      path = String
"/v" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
apiVersion String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"/events" String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String -> (String -> String) -> Maybe String -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe String
"" (String
"?client=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<>) Maybe String
cid String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String -> (String -> String) -> Maybe String -> String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe String
"" (String
"&sync_marker=" String -> String -> String
forall a. Semigroup a => a -> a -> a
<>) Maybe String
mSyncMarker
      caHdrs = [(String -> CI ByteString
forall a. IsString a => String -> a
fromString String
"Z-User", String -> ByteString
forall a. ToByteString a => a -> ByteString
toByteString' String
uid)]
      app Connection
conn = do
        MVar (Either HandshakeException ())
-> Either HandshakeException () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Either HandshakeException ())
wsStarted (() -> Either HandshakeException ()
forall a b. b -> Either a b
Right ())
        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
race_
          (Connection -> IO ()
wsRead Connection
conn IO () -> (ConnectionException -> IO ()) -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` (Chan (Either ConnectionException Value)
-> Either ConnectionException Value -> IO ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan (Either ConnectionException Value)
eventsChan (Either ConnectionException Value -> IO ())
-> (ConnectionException -> Either ConnectionException Value)
-> ConnectionException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConnectionException -> Either ConnectionException Value
forall a b. a -> Either a b
Left))
          (Connection -> IO ()
wsWrite Connection
conn)

      wsRead Connection
conn = IO (ZonkAny 0) -> (Async (ZonkAny 0) -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Connection -> IO (ZonkAny 0)
wsReadLoop Connection
conn) ((Async (ZonkAny 0) -> IO ()) -> IO ())
-> (Async (ZonkAny 0) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Async (ZonkAny 0)
loop -> do
        r <- IO () -> IO (ZonkAny 0) -> IO (Either () (ZonkAny 0))
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
varKill) (Async (ZonkAny 0) -> IO (ZonkAny 0)
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async (ZonkAny 0)
loop)
        case r of
          Left ()
_ -> Async (ZonkAny 0) -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async (ZonkAny 0)
loop IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO ()
waitClosed Connection
conn
          Right ZonkAny 0
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      waitClosed Connection
conn = do
        Connection -> IO Message
WS.receive Connection
conn IO Message -> (Message -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          WS.ControlMessage (WS.Close Word16
_ ByteString
_) ->
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m Bool
tryPutMVar MVar ()
varDone ()
          Message
_ -> Connection -> IO ()
waitClosed Connection
conn

      wsReadLoop Connection
conn = IO () -> IO (ZonkAny 0)
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO (ZonkAny 0)) -> IO () -> IO (ZonkAny 0)
forall a b. (a -> b) -> a -> b
$ do
        bs <- Connection -> IO ByteString
forall a. WebSocketsData a => Connection -> IO a
WS.receiveData Connection
conn
        case decodeStrict' bs of
          Just Value
n -> Chan (Either ConnectionException Value)
-> Either ConnectionException Value -> IO ()
forall (m :: * -> *) a. MonadIO m => Chan a -> a -> m ()
writeChan Chan (Either ConnectionException Value)
eventsChan (Value -> Either ConnectionException Value
forall a b. b -> Either a b
Right Value
n)
          Maybe Value
Nothing ->
            String -> IO ()
forall a. HasCallStack => String -> a
error (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Failed to decode events: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ByteString -> String
forall a. Show a => a -> String
show ByteString
bs

      wsWrite Connection
conn = do
        mAck <- MVar (Maybe Value) -> IO (Maybe Value)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (Maybe Value)
ackChan
        case mAck of
          Maybe Value
Nothing -> Connection -> VHost -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendClose Connection
conn (String -> VHost
Text.pack String
"")
          Just Value
ack ->
            Connection -> ByteString -> IO ()
forall a. WebSocketsData a => Connection -> a -> IO ()
WS.sendBinaryData Connection
conn (Value -> ByteString
forall a. ToJSON a => a -> ByteString
encode Value
ack)
              IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO ()
wsWrite Connection
conn

  wsThread <-
    Codensity
      $ withAsync
      $ liftIO
      $ WS.runClientWith caHost (fromIntegral caPort) path WS.defaultConnectionOptions caHdrs app
      `catch` \(HandshakeException
e :: WS.HandshakeException) -> MVar (Either HandshakeException ())
-> Either HandshakeException () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Either HandshakeException ())
wsStarted (HandshakeException -> Either HandshakeException ()
forall a b. a -> Either a b
Left HandshakeException
e)

  timeOutSeconds <- asks (.timeOutSeconds)
  mStarted <- lift $ timeout (timeOutSeconds * 1_000_000) (takeMVar wsStarted)
  case mStarted of
    Maybe (Either HandshakeException ())
Nothing -> do
      Async () -> Codensity App ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel Async ()
wsThread
      App (Either HandshakeException EventWebSocket)
-> Codensity App (Either HandshakeException EventWebSocket)
forall (m :: * -> *) a. Monad m => m a -> Codensity m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (App (Either HandshakeException EventWebSocket)
 -> Codensity App (Either HandshakeException EventWebSocket))
-> App (Either HandshakeException EventWebSocket)
-> Codensity App (Either HandshakeException EventWebSocket)
forall a b. (a -> b) -> a -> b
$ String -> App (Either HandshakeException EventWebSocket)
forall a. HasCallStack => String -> App a
assertFailure (String -> App (Either HandshakeException EventWebSocket))
-> String -> App (Either HandshakeException EventWebSocket)
forall a b. (a -> b) -> a -> b
$ String
"Websocket failed to connect within " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
timeOutSeconds String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"s"
    Just (Left HandshakeException
e) ->
      Either HandshakeException EventWebSocket
-> Codensity App (Either HandshakeException EventWebSocket)
forall a. a -> Codensity App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (HandshakeException -> Either HandshakeException EventWebSocket
forall a b. a -> Either a b
Left HandshakeException
e)
    Just (Right ()) ->
      (forall b.
 (Either HandshakeException EventWebSocket -> App b) -> App b)
-> Codensity App (Either HandshakeException EventWebSocket)
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b.
  (Either HandshakeException EventWebSocket -> App b) -> App b)
 -> Codensity App (Either HandshakeException EventWebSocket))
-> (forall b.
    (Either HandshakeException EventWebSocket -> App b) -> App b)
-> Codensity App (Either HandshakeException EventWebSocket)
forall a b. (a -> b) -> a -> b
$ \Either HandshakeException EventWebSocket -> App b
k ->
        Either HandshakeException EventWebSocket -> App b
k (EventWebSocket -> Either HandshakeException EventWebSocket
forall a b. b -> Either a b
Right (EventWebSocket -> Either HandshakeException EventWebSocket)
-> EventWebSocket -> Either HandshakeException EventWebSocket
forall a b. (a -> b) -> a -> b
$ Chan (Either ConnectionException Value)
-> MVar (Maybe Value) -> MVar () -> MVar () -> EventWebSocket
EventWebSocket Chan (Either ConnectionException Value)
eventsChan MVar (Maybe Value)
ackChan MVar ()
varKill MVar ()
varDone) App b -> App () -> App b
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` do
          MVar (Maybe Value) -> Maybe Value -> App ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe Value)
ackChan Maybe Value
forall a. Maybe a
Nothing
          IO () -> App ()
forall a. IO a -> App a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> App ()) -> IO () -> App ()
forall a b. (a -> b) -> a -> b
$ Async () -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
wsThread

ackFullSync :: (HasCallStack) => EventWebSocket -> App ()
ackFullSync :: HasCallStack => EventWebSocket -> App ()
ackFullSync EventWebSocket
ws =
  MVar (Maybe Value) -> Maybe Value -> App ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar EventWebSocket
ws.ack
    (Maybe Value -> App ()) -> Maybe Value -> App ()
forall a b. (a -> b) -> a -> b
$ Value -> Maybe Value
forall a. a -> Maybe a
Just ([Pair] -> Value
object [String
"type" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"ack_full_sync"])

ackEvent :: (HasCallStack) => EventWebSocket -> Value -> App ()
ackEvent :: HasCallStack => EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
event = do
  deliveryTag <- Value
event Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.delivery_tag"
  sendAck ws deliveryTag False

sendAck :: (HasCallStack) => EventWebSocket -> Value -> Bool -> App ()
sendAck :: HasCallStack => EventWebSocket -> Value -> Bool -> App ()
sendAck EventWebSocket
ws Value
deliveryTag Bool
multiple =
  do
    MVar (Maybe Value) -> Maybe Value -> App ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar (MVar (Maybe Value) -> Maybe Value -> App ())
-> MVar (Maybe Value) -> Maybe Value -> App ()
forall a b. (a -> b) -> a -> b
$ EventWebSocket
ws.ack
    (Maybe Value -> App ()) -> Maybe Value -> App ()
forall a b. (a -> b) -> a -> b
$ Value -> Maybe Value
forall a. a -> Maybe a
Just
    (Value -> Maybe Value) -> Value -> Maybe Value
forall a b. (a -> b) -> a -> b
$ [Pair] -> Value
object
      [ String
"type" String -> String -> Pair
forall a. ToJSON a => String -> a -> Pair
.= String
"ack",
        String
"data"
          String -> Value -> Pair
forall a. ToJSON a => String -> a -> Pair
.= [Pair] -> Value
object
            [ String
"delivery_tag" String -> Value -> Pair
forall a. ToJSON a => String -> a -> Pair
.= Value
deliveryTag,
              String
"multiple" String -> Bool -> Pair
forall a. ToJSON a => String -> a -> Pair
.= Bool
multiple
            ]
      ]

killWebSocketClient :: EventWebSocket -> App ()
killWebSocketClient :: EventWebSocket -> App ()
killWebSocketClient EventWebSocket
ws = do
  App Bool -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App Bool -> App ()) -> App Bool -> App ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> App Bool
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m Bool
tryPutMVar EventWebSocket
ws.kill ()
  MVar () -> App ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar EventWebSocket
ws.done

assertEvent :: (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a
assertEvent :: forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws HasCallStack => Value -> App a
expectations = do
  timeOutSeconds <- (Env -> Int) -> App Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.timeOutSeconds)
  timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case
    Maybe (Either ConnectionException Value)
Nothing -> String -> App a
forall a. HasCallStack => String -> App a
assertFailure (String -> App a) -> String -> App a
forall a b. (a -> b) -> a -> b
$ String
"No event received for " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
timeOutSeconds String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"s"
    Just (Left ConnectionException
ex) ->
      String -> App a -> App a
forall a. String -> App a -> App a
addFailureContext (String
"WSException: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall e. Exception e => e -> String
displayException ConnectionException
ex)
        (App a -> App a) -> App a -> App a
forall a b. (a -> b) -> a -> b
$ String -> App a
forall a. HasCallStack => String -> App a
assertFailure String
"Websocket closed when waiting for more events"
    Just (Right Value
e) -> do
      pretty <- Value -> App String
forall a. MakesValue a => a -> App String
prettyJSON Value
e
      addFailureContext ("event:\n" <> pretty)
        $ expectations e

-- | Tolerates and consumes other events before expected event
assertFindsEvent :: forall a. (HasCallStack) => EventWebSocket -> ((HasCallStack) => Value -> App a) -> App a
assertFindsEvent :: forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEvent = (HasCallStack => EventWebSocket -> Value -> App ())
-> EventWebSocket -> (HasCallStack => Value -> App a) -> App a
forall a.
HasCallStack =>
(HasCallStack => EventWebSocket -> Value -> App ())
-> EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEventConfigurableAck HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent

assertFindsEventConfigurableAck ::
  forall a.
  (HasCallStack) =>
  ((HasCallStack) => EventWebSocket -> Value -> App ()) ->
  EventWebSocket ->
  ((HasCallStack) => Value -> App a) ->
  App a
assertFindsEventConfigurableAck :: forall a.
HasCallStack =>
(HasCallStack => EventWebSocket -> Value -> App ())
-> EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertFindsEventConfigurableAck HasCallStack => EventWebSocket -> Value -> App ()
ackFun EventWebSocket
ws HasCallStack => Value -> App a
expectations = Int -> App a
go Int
0
  where
    go :: Int -> App a
    go :: Int -> App a
go Int
ignoredEventCount = do
      timeOutSeconds <- (Env -> Int) -> App Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.timeOutSeconds)
      timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case
        Maybe (Either ConnectionException Value)
Nothing -> String -> App a
forall a. HasCallStack => String -> App a
assertFailure (String -> App a) -> String -> App a
forall a b. (a -> b) -> a -> b
$ Int -> String
forall a. Show a => a -> String
show Int
ignoredEventCount String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
" event(s) received, no matching event received for " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
timeOutSeconds String -> String -> String
forall a. Semigroup a => a -> a -> a
<> String
"s"
        Just (Left ConnectionException
ex) ->
          String -> App a -> App a
forall a. String -> App a -> App a
addFailureContext (String
"WSException: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall e. Exception e => e -> String
displayException ConnectionException
ex)
            (App a -> App a) -> App a -> App a
forall a b. (a -> b) -> a -> b
$ String -> App a
forall a. HasCallStack => String -> App a
assertFailure String
"Websocket closed when waiting for more events"
        Just (Right Value
ev) -> do
          (HasCallStack => Value -> App a
Value -> App a
expectations Value
ev)
            App a -> (AssertionFailure -> App a) -> App a
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(AssertionFailure
_ :: AssertionFailure) -> do
              ignoredEventType <-
                App String -> (Value -> App String) -> Maybe Value -> App String
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> App String
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure String
"No Type") Value -> App String
forall a. (HasCallStack, MakesValue a) => a -> App String
asString
                  (Maybe Value -> App String) -> App (Maybe Value) -> App String
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MaybeT App Value -> App (Maybe Value)
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT
                    ( (Value -> String -> MaybeT App Value
forall a.
(HasCallStack, MakesValue a) =>
a -> String -> MaybeT App Value
lookupFieldM Value
ev String
"data.event" MaybeT App Value -> (Value -> MaybeT App Value) -> MaybeT App Value
forall a b. MaybeT App a -> (a -> MaybeT App b) -> MaybeT App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Value -> String -> MaybeT App Value)
-> String -> Value -> MaybeT App Value
forall a b c. (a -> b -> c) -> b -> a -> c
flip Value -> String -> MaybeT App Value
forall a.
(HasCallStack, MakesValue a) =>
a -> String -> MaybeT App Value
lookupFieldM String
"payload.0.type")
                        MaybeT App Value -> MaybeT App Value -> MaybeT App Value
forall a. MaybeT App a -> MaybeT App a -> MaybeT App a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (Value -> String -> MaybeT App Value
forall a.
(HasCallStack, MakesValue a) =>
a -> String -> MaybeT App Value
lookupFieldM Value
ev String
"type")
                    )
              ackFun ws ev
              addJSONToFailureContext ("Ignored Event (" <> ignoredEventType <> ")") ev
                $ go (ignoredEventCount + 1)

data NoEvent = NoEvent | WebSocketDied

instance ToJSON NoEvent where
  toJSON :: NoEvent -> Value
toJSON NoEvent
NoEvent = String -> Value
forall a. ToJSON a => a -> Value
toJSON String
"no-event"
  toJSON NoEvent
WebSocketDied = String -> Value
forall a. ToJSON a => a -> Value
toJSON String
"web-socket-died"

assertNoEventHelper :: (HasCallStack) => EventWebSocket -> App NoEvent
assertNoEventHelper :: HasCallStack => EventWebSocket -> App NoEvent
assertNoEventHelper EventWebSocket
ws = do
  timeOutSeconds <- (Env -> Int) -> App Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.timeOutSeconds)
  timeout (timeOutSeconds * 1_000_000) (readChan ws.events) >>= \case
    Maybe (Either ConnectionException Value)
Nothing -> NoEvent -> App NoEvent
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoEvent
NoEvent
    Just (Left ConnectionException
_) -> NoEvent -> App NoEvent
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoEvent
WebSocketDied
    Just (Right Value
e) -> do
      eventJSON <- Value -> App String
forall a. MakesValue a => a -> App String
prettyJSON Value
e
      assertFailure $ "Did not expect event: \n" <> eventJSON

-- | Similar to `assertNoEvent` from Testlib, but with rabbitMQ typing (`/event` end-point, not
-- `/await`).
assertNoEvent_ :: (HasCallStack) => EventWebSocket -> App ()
assertNoEvent_ :: HasCallStack => EventWebSocket -> App ()
assertNoEvent_ = App NoEvent -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App NoEvent -> App ())
-> (EventWebSocket -> App NoEvent) -> EventWebSocket -> App ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HasCallStack => EventWebSocket -> App NoEvent
EventWebSocket -> App NoEvent
assertNoEventHelper

assertWebSocketDied :: (HasCallStack) => EventWebSocket -> App ()
assertWebSocketDied :: HasCallStack => EventWebSocket -> App ()
assertWebSocketDied EventWebSocket
ws = do
  recpol <- do
    timeOutSeconds <- (Env -> Int) -> App Int
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.timeOutSeconds)
    pure $ limitRetriesByCumulativeDelay (timeOutSeconds * 1_000_000) (constantDelay 800_000)
  recoverAll recpol $ \RetryStatus
_ ->
    HasCallStack => EventWebSocket -> App NoEvent
EventWebSocket -> App NoEvent
assertNoEventHelper EventWebSocket
ws App NoEvent -> (NoEvent -> App ()) -> App ()
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      NoEvent
NoEvent -> String -> App ()
forall a. HasCallStack => String -> App a
assertFailure (String -> App ()) -> String -> App ()
forall a b. (a -> b) -> a -> b
$ String
"WebSocket is still open"
      NoEvent
WebSocketDied -> () -> App ()
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

consumeEventsUntilEndOfInitialSync :: (HasCallStack) => EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync :: HasCallStack => EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
expectedMarkerId = [Value] -> App [Value]
go []
  where
    go :: [Value] -> App [Value]
go [Value]
events = do
      Int
-> App (Either ConnectionException Value)
-> App (Maybe (Either ConnectionException Value))
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
1_000_000 (Chan (Either ConnectionException Value)
-> App (Either ConnectionException Value)
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan EventWebSocket
ws.events) App (Maybe (Either ConnectionException Value))
-> (Maybe (Either ConnectionException Value) -> App [Value])
-> App [Value]
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Maybe (Either ConnectionException Value)
Nothing ->
          String -> [Value] -> App [Value] -> App [Value]
forall a b. MakesValue a => String -> a -> App b -> App b
addJSONToFailureContext String
"events" [Value]
events
            (App [Value] -> App [Value]) -> App [Value] -> App [Value]
forall a b. (a -> b) -> a -> b
$ String -> App [Value]
forall a. HasCallStack => String -> App a
assertFailure String
"timed out waiting for end-of-initial-sync event"
        Just (Left ConnectionException
e) ->
          String -> App [Value]
forall a. HasCallStack => String -> App a
assertFailure
            (String -> App [Value]) -> String -> App [Value]
forall a b. (a -> b) -> a -> b
$ String
"Websocket closed while waiting for end-of-initial-sync event "
            String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall e. Exception e => e -> String
displayException ConnectionException
e
        Just (Right Value
e) -> do
          HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
          t <- Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> (App Value -> App String) -> App String
forall a b. a -> (a -> b) -> b
& App Value -> App String
forall a. (HasCallStack, MakesValue a) => a -> App String
asString
          if (t == "synchronization")
            then do
              markerId <- e %. "data.marker_id" & asString
              if (markerId == expectedMarkerId)
                then pure (events <> [e])
                else assertFailure $ "Expected marker_id " <> expectedMarkerId <> ", but got " <> markerId
            else go (events <> [e])

assertEndOfIniitalSync :: (HasCallStack) => EventWebSocket -> String -> App ()
assertEndOfIniitalSync :: HasCallStack => EventWebSocket -> String -> App ()
assertEndOfIniitalSync EventWebSocket
ws String
endMarker =
  EventWebSocket -> (HasCallStack => Value -> App ()) -> App ()
forall a.
HasCallStack =>
EventWebSocket -> (HasCallStack => Value -> App a) -> App a
assertEvent EventWebSocket
ws ((HasCallStack => Value -> App ()) -> App ())
-> (HasCallStack => Value -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \Value
e -> do
    Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"type" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
"synchronization"
    Value
e Value -> String -> App Value
forall a. (HasCallStack, MakesValue a) => a -> String -> App Value
%. String
"data.marker_id" App Value -> String -> App ()
forall a b.
(MakesValue a, MakesValue b, HasCallStack) =>
a -> b -> App ()
`shouldMatch` String
endMarker

consumeAllEvents_ :: EventWebSocket -> App ()
consumeAllEvents_ :: EventWebSocket -> App ()
consumeAllEvents_ = App [Value] -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App [Value] -> App ())
-> (EventWebSocket -> App [Value]) -> EventWebSocket -> App ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. EventWebSocket -> App [Value]
consumeAllEvents

emptyQueue :: (HasCallStack, MakesValue user) => user -> String -> App ()
emptyQueue :: forall user.
(HasCallStack, MakesValue user) =>
user -> String -> App ()
emptyQueue user
user String
cid = do
  Codensity App (String, EventWebSocket)
-> forall b. ((String, EventWebSocket) -> App b) -> App b
forall k (m :: k -> *) a.
Codensity m a -> forall (b :: k). (a -> m b) -> m b
runCodensity (user -> Maybe String -> Codensity App (String, EventWebSocket)
forall user.
(HasCallStack, MakesValue user) =>
user -> Maybe String -> Codensity App (String, EventWebSocket)
createEventsWebSocketWithSync user
user (String -> Maybe String
forall a. a -> Maybe a
Just String
cid)) (((String, EventWebSocket) -> App ()) -> App ())
-> ((String, EventWebSocket) -> App ()) -> App ()
forall a b. (a -> b) -> a -> b
$ \(String
endMarker, EventWebSocket
ws) -> do
    App [Value] -> App ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (App [Value] -> App ()) -> App [Value] -> App ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => EventWebSocket -> String -> App [Value]
EventWebSocket -> String -> App [Value]
consumeEventsUntilEndOfInitialSync EventWebSocket
ws String
endMarker

consumeAllEvents :: EventWebSocket -> App [Value]
consumeAllEvents :: EventWebSocket -> App [Value]
consumeAllEvents EventWebSocket
ws = do
  Int
-> App (Either ConnectionException Value)
-> App (Maybe (Either ConnectionException Value))
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
1_000_000 (Chan (Either ConnectionException Value)
-> App (Either ConnectionException Value)
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan EventWebSocket
ws.events) App (Maybe (Either ConnectionException Value))
-> (Maybe (Either ConnectionException Value) -> App [Value])
-> App [Value]
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe (Either ConnectionException Value)
Nothing -> [Value] -> App [Value]
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    Just (Left ConnectionException
e) ->
      String -> App [Value]
forall a. HasCallStack => String -> App a
assertFailure
        (String -> App [Value]) -> String -> App [Value]
forall a b. (a -> b) -> a -> b
$ String
"Websocket closed while consuming all events: "
        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall e. Exception e => e -> String
displayException ConnectionException
e
    Just (Right Value
e) -> do
      HasCallStack => EventWebSocket -> Value -> App ()
EventWebSocket -> Value -> App ()
ackEvent EventWebSocket
ws Value
e
      (Value
e Value -> [Value] -> [Value]
forall a. a -> [a] -> [a]
:) ([Value] -> [Value]) -> App [Value] -> App [Value]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventWebSocket -> App [Value]
consumeAllEvents EventWebSocket
ws

consumeAllEventsNoAck :: EventWebSocket -> App [Value]
consumeAllEventsNoAck :: EventWebSocket -> App [Value]
consumeAllEventsNoAck EventWebSocket
ws = do
  Int
-> App (Either ConnectionException Value)
-> App (Maybe (Either ConnectionException Value))
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
1_000_000 (Chan (Either ConnectionException Value)
-> App (Either ConnectionException Value)
forall (m :: * -> *) a. MonadIO m => Chan a -> m a
readChan EventWebSocket
ws.events) App (Maybe (Either ConnectionException Value))
-> (Maybe (Either ConnectionException Value) -> App [Value])
-> App [Value]
forall a b. App a -> (a -> App b) -> App b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe (Either ConnectionException Value)
Nothing -> [Value] -> App [Value]
forall a. a -> App a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    Just (Left ConnectionException
e) ->
      String -> App [Value]
forall a. HasCallStack => String -> App a
assertFailure
        (String -> App [Value]) -> String -> App [Value]
forall a b. (a -> b) -> a -> b
$ String
"Websocket closed while consuming all events: "
        String -> String -> String
forall a. Semigroup a => a -> a -> a
<> ConnectionException -> String
forall e. Exception e => e -> String
displayException ConnectionException
e
    Just (Right Value
e) -> do
      (Value
e Value -> [Value] -> [Value]
forall a. a -> [a] -> [a]
:) ([Value] -> [Value]) -> App [Value] -> App [Value]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventWebSocket -> App [Value]
consumeAllEventsNoAck EventWebSocket
ws

-- | Only considers connections from cannon
waitUntilNoRabbitMqConns :: (HasCallStack) => BackendResource -> App ()
waitUntilNoRabbitMqConns :: HasCallStack => BackendResource -> App ()
waitUntilNoRabbitMqConns BackendResource
backend = do
  rabbitmqAdminClient <- BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
backend
  recoverAll
    (constantDelay 500_000 <> limitRetries 10)
    (const (go rabbitmqAdminClient))
  where
    go :: AdminAPI (AsClientT App) -> App ()
go AdminAPI (AsClientT App)
rabbitmqAdminClient = do
      cannonConnections <- AdminAPI (AsClientT App) -> String -> App [Connection]
getCannonConnections AdminAPI (AsClientT App)
rabbitmqAdminClient BackendResource
backend.berVHost
      cannonConnections `shouldMatch` ([] :: [Connection])

-- | Only kills connections from cannon and returns them
killAllRabbitMqConns :: (HasCallStack) => BackendResource -> App [Connection]
killAllRabbitMqConns :: HasCallStack => BackendResource -> App [Connection]
killAllRabbitMqConns BackendResource
backend = do
  rabbitmqAdminClient <- BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
backend
  cannonConnections <- getCannonConnections rabbitmqAdminClient backend.berVHost
  for_ cannonConnections $ \Connection
connection ->
    AdminAPI (AsClientT App)
rabbitmqAdminClient.deleteConnection Connection
connection.name
  pure cannonConnections

getCannonConnections :: AdminAPI (AsClientT App) -> String -> App [Connection]
getCannonConnections :: AdminAPI (AsClientT App) -> String -> App [Connection]
getCannonConnections AdminAPI (AsClientT App)
rabbitmqAdminClient String
vhost = do
  connections <- AdminAPI (AsClientT App)
rabbitmqAdminClient.listConnectionsByVHost (String -> VHost
Text.pack String
vhost)
  pure $ filter (\Connection
c -> Bool -> (VHost -> Bool) -> Maybe VHost -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False (String -> VHost
forall a. IsString a => String -> a
fromString String
"pool " VHost -> VHost -> Bool
`Text.isPrefixOf`) Connection
c.userProvidedName) connections

mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (Servant.AsClientT App))
mkRabbitMqAdminClientForResource :: BackendResource -> App (AdminAPI (AsClientT App))
mkRabbitMqAdminClientForResource BackendResource
backend = do
  opts <- (Env -> RabbitMqAdminOpts) -> App RabbitMqAdminOpts
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks (.rabbitMQConfig)
  servantClient <- liftIO $ mkRabbitMqAdminClientEnv opts {vHost = Text.pack backend.berVHost}
  pure . fromServant $ Servant.hoistClient (Proxy @(ToServant AdminAPI AsApi)) (liftIO @App) (toServant servantClient)