module Wire.IndexedUserStore.Bulk.ElasticSearch where

import Cassandra.Exec (paginateWithStateC)
import Conduit (ConduitT, runConduit, (.|))
import Data.Conduit.Combinators qualified as Conduit
import Data.Id
import Data.Map qualified as Map
import Data.Set qualified as Set
import Database.Bloodhound qualified as ES
import Imports
import Polysemy
import Polysemy.Error
import Polysemy.TinyLog
import Polysemy.TinyLog qualified as Log
import System.Logger.Message qualified as Log
import Wire.API.Team.Feature
import Wire.GalleyAPIAccess
import Wire.IndexedUserStore (IndexedUserStore)
import Wire.IndexedUserStore qualified as IndexedUserStore
import Wire.IndexedUserStore.Bulk
import Wire.IndexedUserStore.MigrationStore
import Wire.IndexedUserStore.MigrationStore qualified as MigrationStore
import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe), unsafePooledForConcurrentlyN)
import Wire.UserSearch.Migration
import Wire.UserSearch.Types
import Wire.UserStore
import Wire.UserStore.IndexUser

interpretIndexedUserStoreBulk ::
  ( Member TinyLog r,
    Member UserStore r,
    Member (Concurrency Unsafe) r,
    Member GalleyAPIAccess r,
    Member IndexedUserStore r,
    Member (Error MigrationException) r,
    Member IndexedUserMigrationStore r
  ) =>
  InterpreterFor IndexedUserStoreBulk r
interpretIndexedUserStoreBulk :: forall (r :: EffectRow).
(Member TinyLog r, Member UserStore r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r, Member (Error MigrationException) r,
 Member IndexedUserMigrationStore r) =>
InterpreterFor IndexedUserStoreBulk r
interpretIndexedUserStoreBulk = (forall (rInitial :: EffectRow) x.
 IndexedUserStoreBulk (Sem rInitial) x -> Sem r x)
-> Sem (IndexedUserStoreBulk : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
  IndexedUserStoreBulk (Sem rInitial) x
SyncAllUsers -> Sem r x
Sem r ()
forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
Sem r ()
syncAllUsersImpl
  IndexedUserStoreBulk (Sem rInitial) x
ForceSyncAllUsers -> Sem r x
Sem r ()
forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
Sem r ()
forceSyncAllUsersImpl
  IndexedUserStoreBulk (Sem rInitial) x
MigrateData -> Sem r x
Sem r ()
forall (r :: EffectRow).
(Member IndexedUserStore r, Member (Error MigrationException) r,
 Member IndexedUserMigrationStore r, Member UserStore r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member TinyLog r) =>
Sem r ()
migrateDataImpl

syncAllUsersImpl ::
  forall r.
  ( Member UserStore r,
    Member TinyLog r,
    Member (Concurrency 'Unsafe) r,
    Member GalleyAPIAccess r,
    Member IndexedUserStore r
  ) =>
  Sem r ()
syncAllUsersImpl :: forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
Sem r ()
syncAllUsersImpl = (ExternalDocVersion -> VersionControl) -> Sem r ()
forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
(ExternalDocVersion -> VersionControl) -> Sem r ()
syncAllUsersWithVersion ExternalDocVersion -> VersionControl
ES.ExternalGT

forceSyncAllUsersImpl ::
  forall r.
  ( Member UserStore r,
    Member TinyLog r,
    Member (Concurrency 'Unsafe) r,
    Member GalleyAPIAccess r,
    Member IndexedUserStore r
  ) =>
  Sem r ()
forceSyncAllUsersImpl :: forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
Sem r ()
forceSyncAllUsersImpl = (ExternalDocVersion -> VersionControl) -> Sem r ()
forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
(ExternalDocVersion -> VersionControl) -> Sem r ()
syncAllUsersWithVersion ExternalDocVersion -> VersionControl
ES.ExternalGTE

syncAllUsersWithVersion ::
  forall r.
  ( Member UserStore r,
    Member TinyLog r,
    Member (Concurrency 'Unsafe) r,
    Member GalleyAPIAccess r,
    Member IndexedUserStore r
  ) =>
  (ES.ExternalDocVersion -> ES.VersionControl) ->
  Sem r ()
syncAllUsersWithVersion :: forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
(ExternalDocVersion -> VersionControl) -> Sem r ()
syncAllUsersWithVersion ExternalDocVersion -> VersionControl
mkVersion =
  ConduitT () Void (Sem r) () -> Sem r ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void (Sem r) () -> Sem r ())
-> ConduitT () Void (Sem r) () -> Sem r ()
forall a b. (a -> b) -> a -> b
$
    (Maybe PagingState -> Sem r (PageWithState IndexUser))
-> ConduitT () [IndexUser] (Sem r) ()
forall (m :: * -> *) a.
Monad m =>
(Maybe PagingState -> m (PageWithState a)) -> ConduitT () [a] m ()
paginateWithStateC (Int32 -> Maybe PagingState -> Sem r (PageWithState IndexUser)
forall (r :: EffectRow).
Member UserStore r =>
Int32 -> Maybe PagingState -> Sem r (PageWithState IndexUser)
getIndexUsersPaginated Int32
1000)
      ConduitT () [IndexUser] (Sem r) ()
-> ConduitT [IndexUser] Void (Sem r) ()
-> ConduitT () Void (Sem r) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT [IndexUser] [IndexUser] (Sem r) ()
logPage
      ConduitT [IndexUser] [IndexUser] (Sem r) ()
-> ConduitT [IndexUser] Void (Sem r) ()
-> ConduitT [IndexUser] Void (Sem r) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()
mkUserDocs
      ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()
-> ConduitT [(DocId, UserDoc, VersionControl)] Void (Sem r) ()
-> ConduitT [IndexUser] Void (Sem r) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ([(DocId, UserDoc, VersionControl)] -> Sem r ())
-> ConduitT [(DocId, UserDoc, VersionControl)] Void (Sem r) ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
Conduit.mapM_ [(DocId, UserDoc, VersionControl)] -> Sem r ()
forall (r :: EffectRow).
Member IndexedUserStore r =>
[(DocId, UserDoc, VersionControl)] -> Sem r ()
IndexedUserStore.bulkUpsert
  where
    logPage :: ConduitT [IndexUser] [IndexUser] (Sem r) ()
    logPage :: ConduitT [IndexUser] [IndexUser] (Sem r) ()
logPage = ([IndexUser] -> Sem r ())
-> ConduitT [IndexUser] [IndexUser] (Sem r) ()
forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m ()
Conduit.iterM (([IndexUser] -> Sem r ())
 -> ConduitT [IndexUser] [IndexUser] (Sem r) ())
-> ([IndexUser] -> Sem r ())
-> ConduitT [IndexUser] [IndexUser] (Sem r) ()
forall a b. (a -> b) -> a -> b
$ \[IndexUser]
page -> do
      (Msg -> Msg) -> Sem r ()
forall msg (r :: EffectRow).
Member (Logger msg) r =>
msg -> Sem r ()
info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r ()
forall a b. (a -> b) -> a -> b
$
        ByteString -> Int -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"size" ([IndexUser] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [IndexUser]
page)
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Reindex: processing C* page")

    mkUserDocs :: ConduitT [IndexUser] [(ES.DocId, UserDoc, ES.VersionControl)] (Sem r) ()
    mkUserDocs :: ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()
mkUserDocs = ([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)])
-> ConduitT
     [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
Conduit.mapM (([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)])
 -> ConduitT
      [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ())
-> ([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)])
-> ConduitT
     [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()
forall a b. (a -> b) -> a -> b
$ \[IndexUser]
page -> do
      let teamIds :: Set TeamId
teamIds =
            [TeamId] -> Set TeamId
forall a. Ord a => [a] -> Set a
Set.fromList ([TeamId] -> Set TeamId) -> [TeamId] -> Set TeamId
forall a b. (a -> b) -> a -> b
$
              (IndexUser -> Maybe TeamId) -> [IndexUser] -> [TeamId]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe ((WithWritetime TeamId -> TeamId)
-> Maybe (WithWritetime TeamId) -> Maybe TeamId
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap WithWritetime TeamId -> TeamId
forall a. WithWritetime a -> a
value (Maybe (WithWritetime TeamId) -> Maybe TeamId)
-> (IndexUser -> Maybe (WithWritetime TeamId))
-> IndexUser
-> Maybe TeamId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((.teamId))) [IndexUser]
page
      Map TeamId SearchVisibilityInbound
visMap <- ([(TeamId, SearchVisibilityInbound)]
 -> Map TeamId SearchVisibilityInbound)
-> Sem r [(TeamId, SearchVisibilityInbound)]
-> Sem r (Map TeamId SearchVisibilityInbound)
forall a b. (a -> b) -> Sem r a -> Sem r b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(TeamId, SearchVisibilityInbound)]
-> Map TeamId SearchVisibilityInbound
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList (Sem r [(TeamId, SearchVisibilityInbound)]
 -> Sem r (Map TeamId SearchVisibilityInbound))
-> ((TeamId -> Sem r (TeamId, SearchVisibilityInbound))
    -> Sem r [(TeamId, SearchVisibilityInbound)])
-> (TeamId -> Sem r (TeamId, SearchVisibilityInbound))
-> Sem r (Map TeamId SearchVisibilityInbound)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int
-> Set TeamId
-> (TeamId -> Sem r (TeamId, SearchVisibilityInbound))
-> Sem r [(TeamId, SearchVisibilityInbound)]
forall (r :: EffectRow) (t :: * -> *) a b.
(Member (Concurrency 'Unsafe) r, Foldable t) =>
Int -> t a -> (a -> Sem r b) -> Sem r [b]
unsafePooledForConcurrentlyN Int
16 Set TeamId
teamIds ((TeamId -> Sem r (TeamId, SearchVisibilityInbound))
 -> Sem r (Map TeamId SearchVisibilityInbound))
-> (TeamId -> Sem r (TeamId, SearchVisibilityInbound))
-> Sem r (Map TeamId SearchVisibilityInbound)
forall a b. (a -> b) -> a -> b
$ \TeamId
t ->
        (TeamId
t,) (SearchVisibilityInbound -> (TeamId, SearchVisibilityInbound))
-> Sem r SearchVisibilityInbound
-> Sem r (TeamId, SearchVisibilityInbound)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TeamId -> Sem r SearchVisibilityInbound
forall (r :: EffectRow).
Member GalleyAPIAccess r =>
TeamId -> Sem r SearchVisibilityInbound
teamSearchVisibilityInbound TeamId
t
      let vis :: IndexUser -> SearchVisibilityInbound
vis IndexUser
indexUser = SearchVisibilityInbound
-> Maybe SearchVisibilityInbound -> SearchVisibilityInbound
forall a. a -> Maybe a -> a
fromMaybe SearchVisibilityInbound
defaultSearchVisibilityInbound (Maybe SearchVisibilityInbound -> SearchVisibilityInbound)
-> Maybe SearchVisibilityInbound -> SearchVisibilityInbound
forall a b. (a -> b) -> a -> b
$ ((TeamId
 -> Map TeamId SearchVisibilityInbound
 -> Maybe SearchVisibilityInbound)
-> Map TeamId SearchVisibilityInbound
-> TeamId
-> Maybe SearchVisibilityInbound
forall a b c. (a -> b -> c) -> b -> a -> c
flip TeamId
-> Map TeamId SearchVisibilityInbound
-> Maybe SearchVisibilityInbound
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Map TeamId SearchVisibilityInbound
visMap (TeamId -> Maybe SearchVisibilityInbound)
-> (WithWritetime TeamId -> TeamId)
-> WithWritetime TeamId
-> Maybe SearchVisibilityInbound
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WithWritetime TeamId -> TeamId
forall a. WithWritetime a -> a
value (WithWritetime TeamId -> Maybe SearchVisibilityInbound)
-> Maybe (WithWritetime TeamId) -> Maybe SearchVisibilityInbound
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IndexUser
indexUser.teamId)
          mkUserDoc :: IndexUser -> UserDoc
mkUserDoc IndexUser
indexUser = SearchVisibilityInbound -> IndexUser -> UserDoc
indexUserToDoc (IndexUser -> SearchVisibilityInbound
vis IndexUser
indexUser) IndexUser
indexUser
          mkDocVersion :: IndexUser -> VersionControl
mkDocVersion = ExternalDocVersion -> VersionControl
mkVersion (ExternalDocVersion -> VersionControl)
-> (IndexUser -> ExternalDocVersion) -> IndexUser -> VersionControl
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DocVersion -> ExternalDocVersion
ES.ExternalDocVersion (DocVersion -> ExternalDocVersion)
-> (IndexUser -> DocVersion) -> IndexUser -> ExternalDocVersion
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexVersion -> DocVersion
docVersion (IndexVersion -> DocVersion)
-> (IndexUser -> IndexVersion) -> IndexUser -> DocVersion
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IndexUser -> IndexVersion
indexUserToVersion
      [(DocId, UserDoc, VersionControl)]
-> Sem r [(DocId, UserDoc, VersionControl)]
forall a. a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([(DocId, UserDoc, VersionControl)]
 -> Sem r [(DocId, UserDoc, VersionControl)])
-> [(DocId, UserDoc, VersionControl)]
-> Sem r [(DocId, UserDoc, VersionControl)]
forall a b. (a -> b) -> a -> b
$ (IndexUser -> (DocId, UserDoc, VersionControl))
-> [IndexUser] -> [(DocId, UserDoc, VersionControl)]
forall a b. (a -> b) -> [a] -> [b]
map (\IndexUser
u -> (UserId -> DocId
userIdToDocId IndexUser
u.userId, IndexUser -> UserDoc
mkUserDoc IndexUser
u, IndexUser -> VersionControl
mkDocVersion IndexUser
u)) [IndexUser]
page

migrateDataImpl ::
  ( Member IndexedUserStore r,
    Member (Error MigrationException) r,
    Member IndexedUserMigrationStore r,
    Member UserStore r,
    Member (Concurrency Unsafe) r,
    Member GalleyAPIAccess r,
    Member TinyLog r
  ) =>
  Sem r ()
migrateDataImpl :: forall (r :: EffectRow).
(Member IndexedUserStore r, Member (Error MigrationException) r,
 Member IndexedUserMigrationStore r, Member UserStore r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member TinyLog r) =>
Sem r ()
migrateDataImpl = do
  Sem r Bool -> Sem r () -> Sem r ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
unlessM Sem r Bool
forall (r :: EffectRow). Member IndexedUserStore r => Sem r Bool
IndexedUserStore.doesIndexExist (Sem r () -> Sem r ()) -> Sem r () -> Sem r ()
forall a b. (a -> b) -> a -> b
$
    MigrationException -> Sem r ()
forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a
throw MigrationException
TargetIndexAbsent
  Sem r ()
forall (r :: EffectRow).
Member IndexedUserMigrationStore r =>
Sem r ()
MigrationStore.ensureMigrationIndex
  MigrationVersion
foundVersion <- Sem r MigrationVersion
forall (r :: EffectRow).
Member IndexedUserMigrationStore r =>
Sem r MigrationVersion
MigrationStore.getLatestMigrationVersion
  if MigrationVersion
expectedMigrationVersion MigrationVersion -> MigrationVersion -> Bool
forall a. Ord a => a -> a -> Bool
> MigrationVersion
foundVersion
    then do
      (Msg -> Msg) -> Sem r ()
forall msg (r :: EffectRow).
Member (Logger msg) r =>
msg -> Sem r ()
Log.info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r ()
forall a b. (a -> b) -> a -> b
$
        Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"Migration necessary.")
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> MigrationVersion -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"expectedVersion" MigrationVersion
expectedMigrationVersion
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> MigrationVersion -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"foundVersion" MigrationVersion
foundVersion
      Sem r ()
forall (r :: EffectRow).
(Member UserStore r, Member TinyLog r,
 Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r,
 Member IndexedUserStore r) =>
Sem r ()
forceSyncAllUsersImpl
      MigrationVersion -> Sem r ()
forall (r :: EffectRow).
Member IndexedUserMigrationStore r =>
MigrationVersion -> Sem r ()
MigrationStore.persistMigrationVersion MigrationVersion
expectedMigrationVersion
    else do
      (Msg -> Msg) -> Sem r ()
forall msg (r :: EffectRow).
Member (Logger msg) r =>
msg -> Sem r ()
Log.info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r ()
forall a b. (a -> b) -> a -> b
$
        Builder -> Msg -> Msg
forall a. ToBytes a => a -> Msg -> Msg
Log.msg (ByteString -> Builder
Log.val ByteString
"No migration necessary.")
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> MigrationVersion -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"expectedVersion" MigrationVersion
expectedMigrationVersion
          (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> MigrationVersion -> Msg -> Msg
forall a. ToBytes a => ByteString -> a -> Msg -> Msg
Log.field ByteString
"foundVersion" MigrationVersion
foundVersion

teamSearchVisibilityInbound :: (Member GalleyAPIAccess r) => TeamId -> Sem r SearchVisibilityInbound
teamSearchVisibilityInbound :: forall (r :: EffectRow).
Member GalleyAPIAccess r =>
TeamId -> Sem r SearchVisibilityInbound
teamSearchVisibilityInbound TeamId
tid =
  FeatureStatus -> SearchVisibilityInbound
searchVisibilityInboundFromFeatureStatus (FeatureStatus -> SearchVisibilityInbound)
-> (LockableFeature SearchVisibilityInboundConfig -> FeatureStatus)
-> LockableFeature SearchVisibilityInboundConfig
-> SearchVisibilityInbound
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.status)
    (LockableFeature SearchVisibilityInboundConfig
 -> SearchVisibilityInbound)
-> Sem r (LockableFeature SearchVisibilityInboundConfig)
-> Sem r SearchVisibilityInbound
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (r :: EffectRow) feature.
(Member GalleyAPIAccess r, IsFeatureConfig feature,
 Typeable feature) =>
TeamId -> Sem r (LockableFeature feature)
getFeatureConfigForTeam @_ @SearchVisibilityInboundConfig TeamId
tid