module Wire.IndexedUserStore.Bulk.ElasticSearch where import Cassandra.Exec (paginateWithStateC) import Conduit (ConduitT, runConduit, (.|)) import Data.Conduit.Combinators qualified as Conduit import Data.Id import Data.Map qualified as Map import Data.Set qualified as Set import Database.Bloodhound qualified as ES import Imports import Polysemy import Polysemy.Error import Polysemy.TinyLog import Polysemy.TinyLog qualified as Log import System.Logger.Message qualified as Log import Wire.API.Team.Feature import Wire.GalleyAPIAccess import Wire.IndexedUserStore (IndexedUserStore) import Wire.IndexedUserStore qualified as IndexedUserStore import Wire.IndexedUserStore.Bulk import Wire.IndexedUserStore.MigrationStore import Wire.IndexedUserStore.MigrationStore qualified as MigrationStore import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe), unsafePooledForConcurrentlyN) import Wire.UserSearch.Migration import Wire.UserSearch.Types import Wire.UserStore import Wire.UserStore.IndexUser interpretIndexedUserStoreBulk :: ( Member TinyLog r, Member UserStore r, Member (Concurrency Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r ) => InterpreterFor IndexedUserStoreBulk r interpretIndexedUserStoreBulk :: forall (r :: EffectRow). (Member TinyLog r, Member UserStore r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r) => InterpreterFor IndexedUserStoreBulk r interpretIndexedUserStoreBulk = (forall (rInitial :: EffectRow) x. IndexedUserStoreBulk (Sem rInitial) x -> Sem r x) -> Sem (IndexedUserStoreBulk : r) a -> Sem r a forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a. FirstOrder e "interpret" => (forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x) -> Sem (e : r) a -> Sem r a interpret \case IndexedUserStoreBulk (Sem rInitial) x SyncAllUsers -> Sem r x Sem r () forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => Sem r () syncAllUsersImpl IndexedUserStoreBulk (Sem rInitial) x ForceSyncAllUsers -> Sem r x Sem r () forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => Sem r () forceSyncAllUsersImpl IndexedUserStoreBulk (Sem rInitial) x MigrateData -> Sem r x Sem r () forall (r :: EffectRow). (Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r, Member UserStore r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member TinyLog r) => Sem r () migrateDataImpl syncAllUsersImpl :: forall r. ( Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r ) => Sem r () syncAllUsersImpl :: forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => Sem r () syncAllUsersImpl = (ExternalDocVersion -> VersionControl) -> Sem r () forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => (ExternalDocVersion -> VersionControl) -> Sem r () syncAllUsersWithVersion ExternalDocVersion -> VersionControl ES.ExternalGT forceSyncAllUsersImpl :: forall r. ( Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r ) => Sem r () forceSyncAllUsersImpl :: forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => Sem r () forceSyncAllUsersImpl = (ExternalDocVersion -> VersionControl) -> Sem r () forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => (ExternalDocVersion -> VersionControl) -> Sem r () syncAllUsersWithVersion ExternalDocVersion -> VersionControl ES.ExternalGTE syncAllUsersWithVersion :: forall r. ( Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r ) => (ES.ExternalDocVersion -> ES.VersionControl) -> Sem r () syncAllUsersWithVersion :: forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => (ExternalDocVersion -> VersionControl) -> Sem r () syncAllUsersWithVersion ExternalDocVersion -> VersionControl mkVersion = ConduitT () Void (Sem r) () -> Sem r () forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r runConduit (ConduitT () Void (Sem r) () -> Sem r ()) -> ConduitT () Void (Sem r) () -> Sem r () forall a b. (a -> b) -> a -> b $ (Maybe PagingState -> Sem r (PageWithState IndexUser)) -> ConduitT () [IndexUser] (Sem r) () forall (m :: * -> *) a. Monad m => (Maybe PagingState -> m (PageWithState a)) -> ConduitT () [a] m () paginateWithStateC (Int32 -> Maybe PagingState -> Sem r (PageWithState IndexUser) forall (r :: EffectRow). Member UserStore r => Int32 -> Maybe PagingState -> Sem r (PageWithState IndexUser) getIndexUsersPaginated Int32 1000) ConduitT () [IndexUser] (Sem r) () -> ConduitT [IndexUser] Void (Sem r) () -> ConduitT () Void (Sem r) () forall (m :: * -> *) a b c r. Monad m => ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r .| ConduitT [IndexUser] [IndexUser] (Sem r) () logPage ConduitT [IndexUser] [IndexUser] (Sem r) () -> ConduitT [IndexUser] Void (Sem r) () -> ConduitT [IndexUser] Void (Sem r) () forall (m :: * -> *) a b c r. Monad m => ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r .| ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) () mkUserDocs ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) () -> ConduitT [(DocId, UserDoc, VersionControl)] Void (Sem r) () -> ConduitT [IndexUser] Void (Sem r) () forall (m :: * -> *) a b c r. Monad m => ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r .| ([(DocId, UserDoc, VersionControl)] -> Sem r ()) -> ConduitT [(DocId, UserDoc, VersionControl)] Void (Sem r) () forall (m :: * -> *) a o. Monad m => (a -> m ()) -> ConduitT a o m () Conduit.mapM_ [(DocId, UserDoc, VersionControl)] -> Sem r () forall (r :: EffectRow). Member IndexedUserStore r => [(DocId, UserDoc, VersionControl)] -> Sem r () IndexedUserStore.bulkUpsert where logPage :: ConduitT [IndexUser] [IndexUser] (Sem r) () logPage :: ConduitT [IndexUser] [IndexUser] (Sem r) () logPage = ([IndexUser] -> Sem r ()) -> ConduitT [IndexUser] [IndexUser] (Sem r) () forall (m :: * -> *) a. Monad m => (a -> m ()) -> ConduitT a a m () Conduit.iterM (([IndexUser] -> Sem r ()) -> ConduitT [IndexUser] [IndexUser] (Sem r) ()) -> ([IndexUser] -> Sem r ()) -> ConduitT [IndexUser] [IndexUser] (Sem r) () forall a b. (a -> b) -> a -> b $ \[IndexUser] page -> do (Msg -> Msg) -> Sem r () forall msg (r :: EffectRow). Member (Logger msg) r => msg -> Sem r () info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r () forall a b. (a -> b) -> a -> b $ ByteString -> Int -> Msg -> Msg forall a. ToBytes a => ByteString -> a -> Msg -> Msg Log.field ByteString "size" ([IndexUser] -> Int forall a. [a] -> Int forall (t :: * -> *) a. Foldable t => t a -> Int length [IndexUser] page) (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg forall b c a. (b -> c) -> (a -> b) -> a -> c . Builder -> Msg -> Msg forall a. ToBytes a => a -> Msg -> Msg Log.msg (ByteString -> Builder Log.val ByteString "Reindex: processing C* page") mkUserDocs :: ConduitT [IndexUser] [(ES.DocId, UserDoc, ES.VersionControl)] (Sem r) () mkUserDocs :: ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) () mkUserDocs = ([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)]) -> ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) () forall (m :: * -> *) a b. Monad m => (a -> m b) -> ConduitT a b m () Conduit.mapM (([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)]) -> ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) ()) -> ([IndexUser] -> Sem r [(DocId, UserDoc, VersionControl)]) -> ConduitT [IndexUser] [(DocId, UserDoc, VersionControl)] (Sem r) () forall a b. (a -> b) -> a -> b $ \[IndexUser] page -> do let teamIds :: Set TeamId teamIds = [TeamId] -> Set TeamId forall a. Ord a => [a] -> Set a Set.fromList ([TeamId] -> Set TeamId) -> [TeamId] -> Set TeamId forall a b. (a -> b) -> a -> b $ (IndexUser -> Maybe TeamId) -> [IndexUser] -> [TeamId] forall a b. (a -> Maybe b) -> [a] -> [b] mapMaybe ((WithWritetime TeamId -> TeamId) -> Maybe (WithWritetime TeamId) -> Maybe TeamId forall a b. (a -> b) -> Maybe a -> Maybe b forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap WithWritetime TeamId -> TeamId forall a. WithWritetime a -> a value (Maybe (WithWritetime TeamId) -> Maybe TeamId) -> (IndexUser -> Maybe (WithWritetime TeamId)) -> IndexUser -> Maybe TeamId forall b c a. (b -> c) -> (a -> b) -> a -> c . ((.teamId))) [IndexUser] page Map TeamId SearchVisibilityInbound visMap <- ([(TeamId, SearchVisibilityInbound)] -> Map TeamId SearchVisibilityInbound) -> Sem r [(TeamId, SearchVisibilityInbound)] -> Sem r (Map TeamId SearchVisibilityInbound) forall a b. (a -> b) -> Sem r a -> Sem r b forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap [(TeamId, SearchVisibilityInbound)] -> Map TeamId SearchVisibilityInbound forall k a. Ord k => [(k, a)] -> Map k a Map.fromList (Sem r [(TeamId, SearchVisibilityInbound)] -> Sem r (Map TeamId SearchVisibilityInbound)) -> ((TeamId -> Sem r (TeamId, SearchVisibilityInbound)) -> Sem r [(TeamId, SearchVisibilityInbound)]) -> (TeamId -> Sem r (TeamId, SearchVisibilityInbound)) -> Sem r (Map TeamId SearchVisibilityInbound) forall b c a. (b -> c) -> (a -> b) -> a -> c . Int -> Set TeamId -> (TeamId -> Sem r (TeamId, SearchVisibilityInbound)) -> Sem r [(TeamId, SearchVisibilityInbound)] forall (r :: EffectRow) (t :: * -> *) a b. (Member (Concurrency 'Unsafe) r, Foldable t) => Int -> t a -> (a -> Sem r b) -> Sem r [b] unsafePooledForConcurrentlyN Int 16 Set TeamId teamIds ((TeamId -> Sem r (TeamId, SearchVisibilityInbound)) -> Sem r (Map TeamId SearchVisibilityInbound)) -> (TeamId -> Sem r (TeamId, SearchVisibilityInbound)) -> Sem r (Map TeamId SearchVisibilityInbound) forall a b. (a -> b) -> a -> b $ \TeamId t -> (TeamId t,) (SearchVisibilityInbound -> (TeamId, SearchVisibilityInbound)) -> Sem r SearchVisibilityInbound -> Sem r (TeamId, SearchVisibilityInbound) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> TeamId -> Sem r SearchVisibilityInbound forall (r :: EffectRow). Member GalleyAPIAccess r => TeamId -> Sem r SearchVisibilityInbound teamSearchVisibilityInbound TeamId t let vis :: IndexUser -> SearchVisibilityInbound vis IndexUser indexUser = SearchVisibilityInbound -> Maybe SearchVisibilityInbound -> SearchVisibilityInbound forall a. a -> Maybe a -> a fromMaybe SearchVisibilityInbound defaultSearchVisibilityInbound (Maybe SearchVisibilityInbound -> SearchVisibilityInbound) -> Maybe SearchVisibilityInbound -> SearchVisibilityInbound forall a b. (a -> b) -> a -> b $ ((TeamId -> Map TeamId SearchVisibilityInbound -> Maybe SearchVisibilityInbound) -> Map TeamId SearchVisibilityInbound -> TeamId -> Maybe SearchVisibilityInbound forall a b c. (a -> b -> c) -> b -> a -> c flip TeamId -> Map TeamId SearchVisibilityInbound -> Maybe SearchVisibilityInbound forall k a. Ord k => k -> Map k a -> Maybe a Map.lookup Map TeamId SearchVisibilityInbound visMap (TeamId -> Maybe SearchVisibilityInbound) -> (WithWritetime TeamId -> TeamId) -> WithWritetime TeamId -> Maybe SearchVisibilityInbound forall b c a. (b -> c) -> (a -> b) -> a -> c . WithWritetime TeamId -> TeamId forall a. WithWritetime a -> a value (WithWritetime TeamId -> Maybe SearchVisibilityInbound) -> Maybe (WithWritetime TeamId) -> Maybe SearchVisibilityInbound forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b =<< IndexUser indexUser.teamId) mkUserDoc :: IndexUser -> UserDoc mkUserDoc IndexUser indexUser = SearchVisibilityInbound -> IndexUser -> UserDoc indexUserToDoc (IndexUser -> SearchVisibilityInbound vis IndexUser indexUser) IndexUser indexUser mkDocVersion :: IndexUser -> VersionControl mkDocVersion = ExternalDocVersion -> VersionControl mkVersion (ExternalDocVersion -> VersionControl) -> (IndexUser -> ExternalDocVersion) -> IndexUser -> VersionControl forall b c a. (b -> c) -> (a -> b) -> a -> c . DocVersion -> ExternalDocVersion ES.ExternalDocVersion (DocVersion -> ExternalDocVersion) -> (IndexUser -> DocVersion) -> IndexUser -> ExternalDocVersion forall b c a. (b -> c) -> (a -> b) -> a -> c . IndexVersion -> DocVersion docVersion (IndexVersion -> DocVersion) -> (IndexUser -> IndexVersion) -> IndexUser -> DocVersion forall b c a. (b -> c) -> (a -> b) -> a -> c . IndexUser -> IndexVersion indexUserToVersion [(DocId, UserDoc, VersionControl)] -> Sem r [(DocId, UserDoc, VersionControl)] forall a. a -> Sem r a forall (f :: * -> *) a. Applicative f => a -> f a pure ([(DocId, UserDoc, VersionControl)] -> Sem r [(DocId, UserDoc, VersionControl)]) -> [(DocId, UserDoc, VersionControl)] -> Sem r [(DocId, UserDoc, VersionControl)] forall a b. (a -> b) -> a -> b $ (IndexUser -> (DocId, UserDoc, VersionControl)) -> [IndexUser] -> [(DocId, UserDoc, VersionControl)] forall a b. (a -> b) -> [a] -> [b] map (\IndexUser u -> (UserId -> DocId userIdToDocId IndexUser u.userId, IndexUser -> UserDoc mkUserDoc IndexUser u, IndexUser -> VersionControl mkDocVersion IndexUser u)) [IndexUser] page migrateDataImpl :: ( Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r, Member UserStore r, Member (Concurrency Unsafe) r, Member GalleyAPIAccess r, Member TinyLog r ) => Sem r () migrateDataImpl :: forall (r :: EffectRow). (Member IndexedUserStore r, Member (Error MigrationException) r, Member IndexedUserMigrationStore r, Member UserStore r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member TinyLog r) => Sem r () migrateDataImpl = do Sem r Bool -> Sem r () -> Sem r () forall (m :: * -> *). Monad m => m Bool -> m () -> m () unlessM Sem r Bool forall (r :: EffectRow). Member IndexedUserStore r => Sem r Bool IndexedUserStore.doesIndexExist (Sem r () -> Sem r ()) -> Sem r () -> Sem r () forall a b. (a -> b) -> a -> b $ MigrationException -> Sem r () forall e (r :: EffectRow) a. Member (Error e) r => e -> Sem r a throw MigrationException TargetIndexAbsent Sem r () forall (r :: EffectRow). Member IndexedUserMigrationStore r => Sem r () MigrationStore.ensureMigrationIndex MigrationVersion foundVersion <- Sem r MigrationVersion forall (r :: EffectRow). Member IndexedUserMigrationStore r => Sem r MigrationVersion MigrationStore.getLatestMigrationVersion if MigrationVersion expectedMigrationVersion MigrationVersion -> MigrationVersion -> Bool forall a. Ord a => a -> a -> Bool > MigrationVersion foundVersion then do (Msg -> Msg) -> Sem r () forall msg (r :: EffectRow). Member (Logger msg) r => msg -> Sem r () Log.info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r () forall a b. (a -> b) -> a -> b $ Builder -> Msg -> Msg forall a. ToBytes a => a -> Msg -> Msg Log.msg (ByteString -> Builder Log.val ByteString "Migration necessary.") (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> MigrationVersion -> Msg -> Msg forall a. ToBytes a => ByteString -> a -> Msg -> Msg Log.field ByteString "expectedVersion" MigrationVersion expectedMigrationVersion (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> MigrationVersion -> Msg -> Msg forall a. ToBytes a => ByteString -> a -> Msg -> Msg Log.field ByteString "foundVersion" MigrationVersion foundVersion Sem r () forall (r :: EffectRow). (Member UserStore r, Member TinyLog r, Member (Concurrency 'Unsafe) r, Member GalleyAPIAccess r, Member IndexedUserStore r) => Sem r () forceSyncAllUsersImpl MigrationVersion -> Sem r () forall (r :: EffectRow). Member IndexedUserMigrationStore r => MigrationVersion -> Sem r () MigrationStore.persistMigrationVersion MigrationVersion expectedMigrationVersion else do (Msg -> Msg) -> Sem r () forall msg (r :: EffectRow). Member (Logger msg) r => msg -> Sem r () Log.info ((Msg -> Msg) -> Sem r ()) -> (Msg -> Msg) -> Sem r () forall a b. (a -> b) -> a -> b $ Builder -> Msg -> Msg forall a. ToBytes a => a -> Msg -> Msg Log.msg (ByteString -> Builder Log.val ByteString "No migration necessary.") (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> MigrationVersion -> Msg -> Msg forall a. ToBytes a => ByteString -> a -> Msg -> Msg Log.field ByteString "expectedVersion" MigrationVersion expectedMigrationVersion (Msg -> Msg) -> (Msg -> Msg) -> Msg -> Msg forall b c a. (b -> c) -> (a -> b) -> a -> c . ByteString -> MigrationVersion -> Msg -> Msg forall a. ToBytes a => ByteString -> a -> Msg -> Msg Log.field ByteString "foundVersion" MigrationVersion foundVersion teamSearchVisibilityInbound :: (Member GalleyAPIAccess r) => TeamId -> Sem r SearchVisibilityInbound teamSearchVisibilityInbound :: forall (r :: EffectRow). Member GalleyAPIAccess r => TeamId -> Sem r SearchVisibilityInbound teamSearchVisibilityInbound TeamId tid = FeatureStatus -> SearchVisibilityInbound searchVisibilityInboundFromFeatureStatus (FeatureStatus -> SearchVisibilityInbound) -> (LockableFeature SearchVisibilityInboundConfig -> FeatureStatus) -> LockableFeature SearchVisibilityInboundConfig -> SearchVisibilityInbound forall b c a. (b -> c) -> (a -> b) -> a -> c . (.status) (LockableFeature SearchVisibilityInboundConfig -> SearchVisibilityInbound) -> Sem r (LockableFeature SearchVisibilityInboundConfig) -> Sem r SearchVisibilityInbound forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> forall (r :: EffectRow) feature. (Member GalleyAPIAccess r, IsFeatureConfig feature, Typeable feature) => TeamId -> Sem r (LockableFeature feature) getFeatureConfigForTeam @_ @SearchVisibilityInboundConfig TeamId tid