module Galley.API.Teams.Export (getTeamMembersCSV) where

import Control.Concurrent
import Control.Concurrent.Async qualified as Async
import Control.Lens (view, (^.))
import Control.Monad.Codensity
import Data.ByteString (toStrict)
import Data.ByteString.Builder
import Data.Csv
import Data.Handle
import Data.IORef (atomicModifyIORef, newIORef)
import Data.Id
import Data.Map qualified as Map
import Data.Qualified (Local, tUnqualified)
import Galley.Effects
import Galley.Effects.BrigAccess
import Galley.Effects.SparAccess qualified as Spar
import Galley.Effects.TeamMemberStore (listTeamMembers)
import Galley.Effects.TeamStore
import Imports hiding (atomicModifyIORef, newEmptyMVar, newIORef, putMVar, readMVar, takeMVar, threadDelay, tryPutMVar)
import Polysemy
import Polysemy.Async
import Polysemy.Resource
import Wire.API.Error
import Wire.API.Error.Galley
import Wire.API.Routes.LowLevelStream (LowLevelStreamingBody)
import Wire.API.Team.Export
import Wire.API.Team.Member
import Wire.API.User (ScimUserInfo (suiCreatedOn), User (..))
import Wire.Sem.Concurrency
import Wire.Sem.Concurrency.IO
import Wire.Sem.Paging qualified as E
import Wire.Sem.Paging.Cassandra (InternalPaging)

-- | Cache of inviter handles.
--
-- This is used to make sure that inviters are only looked up once in brig,
-- even if they appear as inviters of several users in the team.
type InviterCache = IORef (Map UserId (MVar (Maybe Handle)))

lookupInviter ::
  (Member Resource r, Member BrigAccess r, Member (Final IO) r) =>
  InviterCache ->
  UserId ->
  Sem r (Maybe Handle)
lookupInviter :: forall (r :: EffectRow).
(Member Resource r, Member BrigAccess r, Member (Final IO) r) =>
InviterCache -> UserId -> Sem r (Maybe Handle)
lookupInviter InviterCache
cache UserId
uid = (Sem r (Maybe Handle) -> Sem r () -> Sem r (Maybe Handle))
-> Sem r () -> Sem r (Maybe Handle) -> Sem r (Maybe Handle)
forall a b c. (a -> b -> c) -> b -> a -> c
flip Sem r (Maybe Handle) -> Sem r () -> Sem r (Maybe Handle)
forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
onException Sem r ()
ensureCache (Sem r (Maybe Handle) -> Sem r (Maybe Handle))
-> Sem r (Maybe Handle) -> Sem r (Maybe Handle)
forall a b. (a -> b) -> a -> b
$ do
  MVar (Maybe Handle)
empty <- IO (MVar (Maybe Handle)) -> Sem r (MVar (Maybe Handle))
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal IO (MVar (Maybe Handle))
forall a. IO (MVar a)
newEmptyMVar
  (Bool
cached, MVar (Maybe Handle)
var) <-
    IO (Bool, MVar (Maybe Handle)) -> Sem r (Bool, MVar (Maybe Handle))
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO (Bool, MVar (Maybe Handle))
 -> Sem r (Bool, MVar (Maybe Handle)))
-> IO (Bool, MVar (Maybe Handle))
-> Sem r (Bool, MVar (Maybe Handle))
forall a b. (a -> b) -> a -> b
$ InviterCache
-> (Map UserId (MVar (Maybe Handle))
    -> (Map UserId (MVar (Maybe Handle)), (Bool, MVar (Maybe Handle))))
-> IO (Bool, MVar (Maybe Handle))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef InviterCache
cache ((Map UserId (MVar (Maybe Handle))
  -> (Map UserId (MVar (Maybe Handle)), (Bool, MVar (Maybe Handle))))
 -> IO (Bool, MVar (Maybe Handle)))
-> (Map UserId (MVar (Maybe Handle))
    -> (Map UserId (MVar (Maybe Handle)), (Bool, MVar (Maybe Handle))))
-> IO (Bool, MVar (Maybe Handle))
forall a b. (a -> b) -> a -> b
$ \Map UserId (MVar (Maybe Handle))
m -> case UserId
-> Map UserId (MVar (Maybe Handle)) -> Maybe (MVar (Maybe Handle))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup UserId
uid Map UserId (MVar (Maybe Handle))
m of
      Maybe (MVar (Maybe Handle))
Nothing -> (UserId
-> MVar (Maybe Handle)
-> Map UserId (MVar (Maybe Handle))
-> Map UserId (MVar (Maybe Handle))
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert UserId
uid MVar (Maybe Handle)
empty Map UserId (MVar (Maybe Handle))
m, (Bool
False, MVar (Maybe Handle)
empty))
      Just MVar (Maybe Handle)
v -> (Map UserId (MVar (Maybe Handle))
m, (Bool
True, MVar (Maybe Handle)
v))
  -- the cache did not contain this user, so write it in the corresponding MVar
  Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cached (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
    Maybe User
u <- [User] -> Maybe User
forall a. [a] -> Maybe a
listToMaybe ([User] -> Maybe User) -> Sem r [User] -> Sem r (Maybe User)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [UserId] -> Sem r [User]
forall (r :: EffectRow).
Member BrigAccess r =>
[UserId] -> Sem r [User]
getUsers [UserId
uid]
    IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ MVar (Maybe Handle) -> Maybe Handle -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Maybe Handle)
var (Maybe User
u Maybe User -> (User -> Maybe Handle) -> Maybe Handle
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= User -> Maybe Handle
userHandle)
  -- at this point, we know that the MVar contains a value or some other thread
  -- is about to write one, so it is safe to just read from the MVar with a
  -- blocking call
  IO (Maybe Handle) -> Sem r (Maybe Handle)
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO (Maybe Handle) -> Sem r (Maybe Handle))
-> IO (Maybe Handle) -> Sem r (Maybe Handle)
forall a b. (a -> b) -> a -> b
$ MVar (Maybe Handle) -> IO (Maybe Handle)
forall a. MVar a -> IO a
readMVar MVar (Maybe Handle)
var
  where
    -- this is run in case of errors to guarantee that other threads will never
    -- deadlock while reading the cache
    ensureCache :: Sem r ()
ensureCache = IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO () -> Sem r ()) -> IO () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ do
      Map UserId (MVar (Maybe Handle))
m <- InviterCache -> IO (Map UserId (MVar (Maybe Handle)))
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef InviterCache
cache
      Maybe (MVar (Maybe Handle))
-> (MVar (Maybe Handle) -> IO Bool) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (UserId
-> Map UserId (MVar (Maybe Handle)) -> Maybe (MVar (Maybe Handle))
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup UserId
uid Map UserId (MVar (Maybe Handle))
m) ((MVar (Maybe Handle) -> IO Bool) -> IO ())
-> (MVar (Maybe Handle) -> IO Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ \MVar (Maybe Handle)
var ->
        MVar (Maybe Handle) -> Maybe Handle -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar (Maybe Handle)
var Maybe Handle
forall a. Maybe a
Nothing

getUserRecord ::
  ( Member BrigAccess r,
    Member Spar.SparAccess r,
    Member (ErrorS TeamMemberNotFound) r,
    Member (Final IO) r,
    Member Resource r
  ) =>
  InviterCache ->
  TeamMember ->
  Sem r TeamExportUser
getUserRecord :: forall (r :: EffectRow).
(Member BrigAccess r, Member SparAccess r,
 Member (ErrorS 'TeamMemberNotFound) r, Member (Final IO) r,
 Member Resource r) =>
InviterCache -> TeamMember -> Sem r TeamExportUser
getUserRecord InviterCache
cache TeamMember
member = do
  let uid :: UserId
uid = TeamMember
member TeamMember -> Getting UserId TeamMember UserId -> UserId
forall s a. s -> Getting a s a -> a
^. Getting UserId TeamMember UserId
Lens' TeamMember UserId
userId
  TeamExportUser
export <- UserId -> Sem r (Maybe TeamExportUser)
forall (r :: EffectRow).
Member BrigAccess r =>
UserId -> Sem r (Maybe TeamExportUser)
getUserExportData UserId
uid Sem r (Maybe TeamExportUser)
-> (Maybe TeamExportUser -> Sem r TeamExportUser)
-> Sem r TeamExportUser
forall a b. Sem r a -> (a -> Sem r b) -> Sem r b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {k} (e :: k) (r :: EffectRow) a.
Member (ErrorS e) r =>
Maybe a -> Sem r a
forall (e :: GalleyError) (r :: EffectRow) a.
Member (ErrorS e) r =>
Maybe a -> Sem r a
noteS @TeamMemberNotFound
  Maybe UTCTimeMillis
mCreatedOn <- do
    let mFromInvitation :: Maybe UTCTimeMillis
mFromInvitation = (UserId, UTCTimeMillis) -> UTCTimeMillis
forall a b. (a, b) -> b
snd ((UserId, UTCTimeMillis) -> UTCTimeMillis)
-> Maybe (UserId, UTCTimeMillis) -> Maybe UTCTimeMillis
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TeamMember
member TeamMember
-> Getting
     (Maybe (UserId, UTCTimeMillis))
     TeamMember
     (Maybe (UserId, UTCTimeMillis))
-> Maybe (UserId, UTCTimeMillis)
forall s a. s -> Getting a s a -> a
^. Getting
  (Maybe (UserId, UTCTimeMillis))
  TeamMember
  (Maybe (UserId, UTCTimeMillis))
Lens' TeamMember (Maybe (UserId, UTCTimeMillis))
invitation
    case Maybe UTCTimeMillis
mFromInvitation of
      Just UTCTimeMillis
ts -> Maybe UTCTimeMillis -> Sem r (Maybe UTCTimeMillis)
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe UTCTimeMillis -> Sem r (Maybe UTCTimeMillis))
-> Maybe UTCTimeMillis -> Sem r (Maybe UTCTimeMillis)
forall a b. (a -> b) -> a -> b
$ UTCTimeMillis -> Maybe UTCTimeMillis
forall a. a -> Maybe a
Just UTCTimeMillis
ts
      Maybe UTCTimeMillis
Nothing -> ScimUserInfo -> Maybe UTCTimeMillis
suiCreatedOn (ScimUserInfo -> Maybe UTCTimeMillis)
-> Sem r ScimUserInfo -> Sem r (Maybe UTCTimeMillis)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UserId -> Sem r ScimUserInfo
forall (r :: EffectRow).
Member SparAccess r =>
UserId -> Sem r ScimUserInfo
Spar.lookupScimUserInfo UserId
uid
  -- look up inviter handle from the cache
  let mInviterId :: Maybe UserId
mInviterId = (UserId, UTCTimeMillis) -> UserId
forall a b. (a, b) -> a
fst ((UserId, UTCTimeMillis) -> UserId)
-> Maybe (UserId, UTCTimeMillis) -> Maybe UserId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TeamMember
member TeamMember
-> Getting
     (Maybe (UserId, UTCTimeMillis))
     TeamMember
     (Maybe (UserId, UTCTimeMillis))
-> Maybe (UserId, UTCTimeMillis)
forall s a. s -> Getting a s a -> a
^. Getting
  (Maybe (UserId, UTCTimeMillis))
  TeamMember
  (Maybe (UserId, UTCTimeMillis))
Lens' TeamMember (Maybe (UserId, UTCTimeMillis))
invitation
  Maybe Handle
invitedBy <- Maybe (Maybe Handle) -> Maybe Handle
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (Maybe (Maybe Handle) -> Maybe Handle)
-> Sem r (Maybe (Maybe Handle)) -> Sem r (Maybe Handle)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (UserId -> Sem r (Maybe Handle))
-> Maybe UserId -> Sem r (Maybe (Maybe Handle))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse (InviterCache -> UserId -> Sem r (Maybe Handle)
forall (r :: EffectRow).
(Member Resource r, Member BrigAccess r, Member (Final IO) r) =>
InviterCache -> UserId -> Sem r (Maybe Handle)
lookupInviter InviterCache
cache) Maybe UserId
mInviterId
  TeamExportUser -> Sem r TeamExportUser
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    TeamExportUser
export
      { tExportInvitedBy = invitedBy,
        tExportRole = permissionsRole . view permissions $ member,
        tExportCreatedOn = mCreatedOn
      }

-- | Export team info as a CSV, and stream it to the client.
--
-- We paginate through the team member list, then spawn a thread for each user
-- (out of a thread pool) in order to fetch information for that user from brig
-- and spar. Inviter IDs are resolved to handles via a brig request, then
-- stored in a cache so that they can be reused by subsequent requests.
getTeamMembersCSV ::
  forall r.
  ( Member BrigAccess r,
    Member (ErrorS 'AccessDenied) r,
    Member (TeamMemberStore InternalPaging) r,
    Member TeamStore r,
    Member (Final IO) r,
    Member SparAccess r
  ) =>
  Local UserId ->
  TeamId ->
  Sem r LowLevelStreamingBody
getTeamMembersCSV :: forall (r :: EffectRow).
(Member BrigAccess r, Member (ErrorS 'AccessDenied) r,
 Member (TeamMemberStore InternalPaging) r, Member TeamStore r,
 Member (Final IO) r, Member SparAccess r) =>
Local UserId -> TeamId -> Sem r LowLevelStreamingBody
getTeamMembersCSV Local UserId
lusr TeamId
tid = do
  TeamId -> UserId -> Sem r (Maybe TeamMember)
forall (r :: EffectRow).
Member TeamStore r =>
TeamId -> UserId -> Sem r (Maybe TeamMember)
getTeamMember TeamId
tid (Local UserId -> UserId
forall (t :: QTag) a. QualifiedWithTag t a -> a
tUnqualified Local UserId
lusr) Sem r (Maybe TeamMember)
-> (Maybe TeamMember -> Sem r ()) -> Sem r ()
forall a b. Sem r a -> (a -> Sem r b) -> Sem r b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Maybe TeamMember
Nothing -> forall {k} (e :: k) (r :: EffectRow) a.
Member (ErrorS e) r =>
Sem r a
forall (e :: GalleyError) (r :: EffectRow) a.
Member (ErrorS e) r =>
Sem r a
throwS @'AccessDenied
    Just TeamMember
member -> Bool -> Sem r () -> Sem r ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (TeamMember
member TeamMember -> HiddenPerm -> Bool
forall perm. IsPerm perm => TeamMember -> perm -> Bool
`hasPermission` HiddenPerm
DownloadTeamMembersCsv) (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$ forall {k} (e :: k) (r :: EffectRow) a.
Member (ErrorS e) r =>
Sem r a
forall (e :: GalleyError) (r :: EffectRow) a.
Member (ErrorS e) r =>
Sem r a
throwS @'AccessDenied

  Chan (Maybe LByteString)
chan <- IO (Chan (Maybe LByteString)) -> Sem r (Chan (Maybe LByteString))
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal IO (Chan (Maybe LByteString))
forall a. IO (Chan a)
newChan
  InviterCache
cache <- IO InviterCache -> Sem r InviterCache
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO InviterCache -> Sem r InviterCache)
-> IO InviterCache -> Sem r InviterCache
forall a b. (a -> b) -> a -> b
$ Map UserId (MVar (Maybe Handle)) -> IO InviterCache
forall a. a -> IO (IORef a)
newIORef Map UserId (MVar (Maybe Handle))
forall a. Monoid a => a
mempty

  let encodeRow :: a -> LByteString
encodeRow a
r = EncodeOptions -> [a] -> LByteString
forall a.
(DefaultOrdered a, ToNamedRecord a) =>
EncodeOptions -> [a] -> LByteString
encodeDefaultOrderedByNameWith EncodeOptions
customEncodeOptions [a
r]
  let produceTeamExportUsers :: Sem (Concurrency 'Unsafe : Resource : Async : r) ()
produceTeamExportUsers = do
        IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall a b. (a -> b) -> a -> b
$ Chan (Maybe LByteString) -> Maybe LByteString -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe LByteString)
chan (LByteString -> Maybe LByteString
forall a. a -> Maybe a
Just LByteString
headerLine)
        (Maybe (PagingState InternalPaging TeamMember)
 -> Sem
      (Concurrency 'Unsafe : Resource : Async : r)
      (Page InternalPaging TeamMember))
-> ([TeamMember]
    -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall p (m :: * -> *) i.
(Paging p, Monad m) =>
(Maybe (PagingState p i) -> m (Page p i)) -> ([i] -> m ()) -> m ()
E.withChunks (\Maybe (PagingState InternalPaging TeamMember)
mps -> forall p (r :: EffectRow).
Member (TeamMemberStore p) r =>
TeamId
-> Maybe (PagingState p TeamMember)
-> PagingBounds p TeamMember
-> Sem r (Page p TeamMember)
listTeamMembers @InternalPaging TeamId
tid Maybe (PagingState InternalPaging TeamMember)
mps PagingBounds InternalPaging TeamMember
Range 1 HardTruncationLimit Int32
forall a. Bounded a => a
maxBound) (([TeamMember]
  -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
 -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> ([TeamMember]
    -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall a b. (a -> b) -> a -> b
$
          \[TeamMember]
members -> Int
-> [TeamMember]
-> (TeamMember
    -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (r :: EffectRow) (t :: * -> *) a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
Int -> t a -> (a -> Sem r b) -> Sem r ()
unsafePooledForConcurrentlyN_ Int
8 [TeamMember]
members ((TeamMember
  -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
 -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> (TeamMember
    -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall a b. (a -> b) -> a -> b
$ \TeamMember
member -> do
            Maybe TeamExportUser
mRecord <-
              forall {k} (e :: k) (r :: EffectRow) a.
Sem (ErrorS e : r) a -> Sem r (Maybe a)
forall (e :: GalleyError) (r :: EffectRow) a.
Sem (ErrorS e : r) a -> Sem r (Maybe a)
runErrorS @TeamMemberNotFound (Sem
   (ErrorS 'TeamMemberNotFound
      : Concurrency 'Unsafe : Resource : Async : r)
   TeamExportUser
 -> Sem
      (Concurrency 'Unsafe : Resource : Async : r)
      (Maybe TeamExportUser))
-> Sem
     (ErrorS 'TeamMemberNotFound
        : Concurrency 'Unsafe : Resource : Async : r)
     TeamExportUser
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) (Maybe TeamExportUser)
forall a b. (a -> b) -> a -> b
$
                InviterCache
-> TeamMember
-> Sem
     (ErrorS 'TeamMemberNotFound
        : Concurrency 'Unsafe : Resource : Async : r)
     TeamExportUser
forall (r :: EffectRow).
(Member BrigAccess r, Member SparAccess r,
 Member (ErrorS 'TeamMemberNotFound) r, Member (Final IO) r,
 Member Resource r) =>
InviterCache -> TeamMember -> Sem r TeamExportUser
getUserRecord InviterCache
cache TeamMember
member
            let mRow :: Maybe LByteString
mRow = TeamExportUser -> LByteString
forall {a}. (DefaultOrdered a, ToNamedRecord a) => a -> LByteString
encodeRow (TeamExportUser -> LByteString)
-> Maybe TeamExportUser -> Maybe LByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe TeamExportUser
mRecord
            Bool
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe LByteString -> Bool
forall a. Maybe a -> Bool
isJust Maybe LByteString
mRow) (Sem (Concurrency 'Unsafe : Resource : Async : r) ()
 -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall a b. (a -> b) -> a -> b
$
              IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall a b. (a -> b) -> a -> b
$
                Chan (Maybe LByteString) -> Maybe LByteString -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe LByteString)
chan Maybe LByteString
mRow

  -- In case an exception is thrown inside the producer thread, the response
  -- will not contain a correct error message, but rather be an http error such
  -- as 'InvalidChunkHeaders'. The exception however still reaches the
  -- middleware and is being tracked in logging and metrics.
  let producerThread :: Sem (Concurrency 'Unsafe : Resource : Async : r) ()
producerThread =
        Sem (Concurrency 'Unsafe : Resource : Async : r) ()
produceTeamExportUsers
          Sem (Concurrency 'Unsafe : Resource : Async : r) ()
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (r :: EffectRow) a b.
Member Resource r =>
Sem r a -> Sem r b -> Sem r a
`finally` IO () -> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (m :: * -> *) (r :: EffectRow) a.
(Member (Final m) r, Functor m) =>
m a -> Sem r a
embedFinal (Chan (Maybe LByteString) -> Maybe LByteString -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe LByteString)
chan Maybe LByteString
forall a. Maybe a
Nothing)

  Sem (Async : r) LowLevelStreamingBody
-> Sem r LowLevelStreamingBody
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Async : r) a -> Sem r a
asyncToIOFinal (Sem (Async : r) LowLevelStreamingBody
 -> Sem r LowLevelStreamingBody)
-> (Sem
      (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
    -> Sem (Async : r) LowLevelStreamingBody)
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
-> Sem r LowLevelStreamingBody
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Sem (Resource : Async : r) LowLevelStreamingBody
-> Sem (Async : r) LowLevelStreamingBody
forall (r :: EffectRow) a.
Member (Final IO) r =>
Sem (Resource : r) a -> Sem r a
resourceToIOFinal (Sem (Resource : Async : r) LowLevelStreamingBody
 -> Sem (Async : r) LowLevelStreamingBody)
-> (Sem
      (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
    -> Sem (Resource : Async : r) LowLevelStreamingBody)
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
-> Sem (Async : r) LowLevelStreamingBody
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (r :: EffectRow) (safe :: ConcurrencySafety) a.
Member (Final IO) r =>
Sem (Concurrency safe : r) a -> Sem r a
unsafelyPerformConcurrency @_ @Unsafe (Sem
   (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
 -> Sem r LowLevelStreamingBody)
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
-> Sem r LowLevelStreamingBody
forall a b. (a -> b) -> a -> b
$ do
    -- Here we should really capture the Wai continuation and run the finaliser
    -- after that. Unfortunately, this is not really possible with Servant,
    -- because the continuation is not exposed by the Handler monad. The best
    -- we can do is return a Codensity value with the correct finaliser, but
    -- that still leaves a short window between when the resource is acquired
    -- and when the finaliser is installed where the resource might be leaked.
    -- I don't have a good solution for that.
    Sem (Concurrency 'Unsafe : Resource : Async : r) (Async (Maybe ()))
-> (Async (Maybe ())
    -> Sem (Concurrency 'Unsafe : Resource : Async : r) ())
-> (Async (Maybe ())
    -> Sem
         (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody)
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracketOnError
      (Sem (Concurrency 'Unsafe : Resource : Async : r) ()
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) (Async (Maybe ()))
forall (r :: EffectRow) a.
Member Async r =>
Sem r a -> Sem r (Async (Maybe a))
async Sem (Concurrency 'Unsafe : Resource : Async : r) ()
producerThread)
      Async (Maybe ())
-> Sem (Concurrency 'Unsafe : Resource : Async : r) ()
forall (r :: EffectRow) a. Member Async r => Async a -> Sem r ()
cancel
      ((Async (Maybe ())
  -> Sem
       (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody)
 -> Sem
      (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody)
-> (Async (Maybe ())
    -> Sem
         (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody)
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
forall a b. (a -> b) -> a -> b
$ \Async (Maybe ())
producer -> do
        LowLevelStreamingBody
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
forall a. a -> Sem (Concurrency 'Unsafe : Resource : Async : r) a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LowLevelStreamingBody
 -> Sem
      (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody)
-> LowLevelStreamingBody
-> Sem
     (Concurrency 'Unsafe : Resource : Async : r) LowLevelStreamingBody
forall a b. (a -> b) -> a -> b
$ do
          Codensity IO () -> Codensity IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Codensity IO () -> Codensity IO ())
-> Codensity IO () -> Codensity IO ()
forall a b. (a -> b) -> a -> b
$ (forall b. (() -> IO b) -> IO b) -> Codensity IO ()
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (() -> IO b) -> IO b) -> Codensity IO ())
-> (forall b. (() -> IO b) -> IO b) -> Codensity IO ()
forall a b. (a -> b) -> a -> b
$ \() -> IO b
k -> do
            b
r <- () -> IO b
k ()
            Async (Maybe ()) -> IO ()
forall a. Async a -> IO ()
Async.cancel Async (Maybe ())
producer
            b -> IO b
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
r
          StreamingBody -> LowLevelStreamingBody
forall a. a -> Codensity IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamingBody -> LowLevelStreamingBody)
-> StreamingBody -> LowLevelStreamingBody
forall a b. (a -> b) -> a -> b
$ \Builder -> IO ()
write IO ()
flush -> do
            let go :: IO ()
go = do
                  Chan (Maybe LByteString) -> IO (Maybe LByteString)
forall a. Chan a -> IO a
readChan Chan (Maybe LByteString)
chan IO (Maybe LByteString) -> (Maybe LByteString -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Maybe LByteString
Nothing -> Builder -> IO ()
write Builder
"" IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
flush
                    Just LByteString
line -> Builder -> IO ()
write (ByteString -> Builder
byteString (LByteString -> ByteString
toStrict LByteString
line)) IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
flush IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
go
            IO ()
go

headerLine :: LByteString
headerLine :: LByteString
headerLine = EncodeOptions -> [TeamExportUser] -> LByteString
forall a.
(DefaultOrdered a, ToNamedRecord a) =>
EncodeOptions -> [a] -> LByteString
encodeDefaultOrderedByNameWith (EncodeOptions
customEncodeOptions {encIncludeHeader = True}) ([] :: [TeamExportUser])

customEncodeOptions :: EncodeOptions
customEncodeOptions :: EncodeOptions
customEncodeOptions =
  EncodeOptions
    { encDelimiter :: Word8
encDelimiter = Int -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Char -> Int
ord Char
','),
      encUseCrLf :: Bool
encUseCrLf = Bool
True, -- to be compatible with Mac and Windows
      encIncludeHeader :: Bool
encIncludeHeader = Bool
False, -- (so we can flush when the header is on the wire)
      encQuoting :: Quoting
encQuoting = Quoting
QuoteAll
    }