{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Cassandra.Exec
( params,
paramsP,
x5,
x1,
paginateC,
GeneralPaginationState (..),
paginationStateCassandra,
paginationStatePostgres,
PageWithState (..),
paginateWithState,
paginateWithStateC,
paramsPagingState,
pwsHasMore,
module C,
)
where
import Cassandra.CQL (Consistency, R)
import Control.Monad.Catch
import Data.Conduit
import Database.CQL.IO (ProtocolError (UnexpectedResponse), RetrySettings, RunQ, defRetrySettings, eagerRetrySettings, getResult, hrHost, hrResponse, runQ)
import Database.CQL.IO as C (BatchM, Client, ClientState, MonadClient, Page (..), PrepQuery, Row, addPrepQuery, addQuery, adjustConsistency, adjustResponseTimeout, adjustSendTimeout, batch, emptyPage, init, liftClient, localState, paginate, prepared, query, query1, queryString, retry, runClient, schema, setConsistency, setSerialConsistency, setType, shutdown, trans, write)
import Database.CQL.Protocol (Error, QueryParams (QueryParams), Tuple, pagingState)
import Database.CQL.Protocol qualified as Protocol
import Imports hiding (init)
params :: Consistency -> a -> QueryParams a
params :: forall a. Consistency -> a -> QueryParams a
params Consistency
c a
p = Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
forall a.
Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
QueryParams Consistency
c Bool
False a
p Maybe Int32
forall a. Maybe a
Nothing Maybe PagingState
forall a. Maybe a
Nothing Maybe SerialConsistency
forall a. Maybe a
Nothing Maybe Bool
forall a. Maybe a
Nothing
{-# INLINE params #-}
paramsP :: Consistency -> a -> Int32 -> QueryParams a
paramsP :: forall a. Consistency -> a -> Int32 -> QueryParams a
paramsP Consistency
c a
p Int32
n = Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
forall a.
Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
QueryParams Consistency
c Bool
False a
p (Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
n) Maybe PagingState
forall a. Maybe a
Nothing Maybe SerialConsistency
forall a. Maybe a
Nothing Maybe Bool
forall a. Maybe a
Nothing
{-# INLINE paramsP #-}
x5 :: RetrySettings
x5 :: RetrySettings
x5 = RetrySettings
eagerRetrySettings
{-# INLINE x5 #-}
x1 :: RetrySettings
x1 :: RetrySettings
x1 = RetrySettings
defRetrySettings
{-# INLINE x1 #-}
data CassandraError
= Cassandra !Error
| Comm !IOException
| InvalidData !Text
| Other !SomeException
deriving (Int -> CassandraError -> ShowS
[CassandraError] -> ShowS
CassandraError -> String
(Int -> CassandraError -> ShowS)
-> (CassandraError -> String)
-> ([CassandraError] -> ShowS)
-> Show CassandraError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> CassandraError -> ShowS
showsPrec :: Int -> CassandraError -> ShowS
$cshow :: CassandraError -> String
show :: CassandraError -> String
$cshowList :: [CassandraError] -> ShowS
showList :: [CassandraError] -> ShowS
Show)
paginateC ::
(Tuple a, Tuple b, RunQ q, MonadClient m) =>
q R a b ->
QueryParams a ->
RetrySettings ->
ConduitM () [b] m ()
paginateC :: forall a b (q :: * -> * -> * -> *) (m :: * -> *).
(Tuple a, Tuple b, RunQ q, MonadClient m) =>
q R a b -> QueryParams a -> RetrySettings -> ConduitM () [b] m ()
paginateC q R a b
q QueryParams a
p RetrySettings
r = Page b -> ConduitT () [b] m ()
go (Page b -> ConduitT () [b] m ())
-> ConduitT () [b] m (Page b) -> ConduitT () [b] m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Page b) -> ConduitT () [b] m (Page b)
forall (m :: * -> *) a. Monad m => m a -> ConduitT () [b] m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (RetrySettings -> m (Page b) -> m (Page b)
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
r (q R a b -> QueryParams a -> m (Page b)
forall (m :: * -> *) a b (q :: * -> * -> * -> *).
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> m (Page b)
paginate q R a b
q QueryParams a
p))
where
go :: Page b -> ConduitT () [b] m ()
go Page b
page = do
Bool -> ConduitT () [b] m () -> ConduitT () [b] m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([b] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (Page b -> [b]
forall a. Page a -> [a]
result Page b
page)) (ConduitT () [b] m () -> ConduitT () [b] m ())
-> ConduitT () [b] m () -> ConduitT () [b] m ()
forall a b. (a -> b) -> a -> b
$
[b] -> ConduitT () [b] m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Page b -> [b]
forall a. Page a -> [a]
result Page b
page)
Bool -> ConduitT () [b] m () -> ConduitT () [b] m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Page b -> Bool
forall a. Page a -> Bool
hasMore Page b
page) (ConduitT () [b] m () -> ConduitT () [b] m ())
-> ConduitT () [b] m () -> ConduitT () [b] m ()
forall a b. (a -> b) -> a -> b
$
Page b -> ConduitT () [b] m ()
go (Page b -> ConduitT () [b] m ())
-> ConduitT () [b] m (Page b) -> ConduitT () [b] m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Page b) -> ConduitT () [b] m (Page b)
forall (m :: * -> *) a. Monad m => m a -> ConduitT () [b] m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (RetrySettings -> m (Page b) -> m (Page b)
forall (m :: * -> *) a.
MonadClient m =>
RetrySettings -> m a -> m a
retry RetrySettings
r (Client (Page b) -> m (Page b)
forall a. Client a -> m a
forall (m :: * -> *) a. MonadClient m => Client a -> m a
liftClient (Page b -> Client (Page b)
forall a. Page a -> Client (Page a)
nextPage Page b
page)))
data a
= PaginationStateCassandra Protocol.PagingState
| a
paginationStateCassandra :: GeneralPaginationState pgState -> Maybe Protocol.PagingState
paginationStateCassandra :: forall pgState. GeneralPaginationState pgState -> Maybe PagingState
paginationStateCassandra = \case
PaginationStateCassandra PagingState
state -> PagingState -> Maybe PagingState
forall a. a -> Maybe a
Just PagingState
state
PaginationStatePostgres {} -> Maybe PagingState
forall a. Maybe a
Nothing
paginationStatePostgres :: GeneralPaginationState pgState -> Maybe pgState
= \case
PaginationStatePostgres pgState
pgState -> pgState -> Maybe pgState
forall a. a -> Maybe a
Just pgState
pgState
PaginationStateCassandra {} -> Maybe pgState
forall a. Maybe a
Nothing
data PageWithState state res = PageWithState
{ forall state res. PageWithState state res -> [res]
pwsResults :: [res],
forall state res.
PageWithState state res -> Maybe (GeneralPaginationState state)
pwsState :: Maybe (GeneralPaginationState state)
}
deriving ((forall a b.
(a -> b) -> PageWithState state a -> PageWithState state b)
-> (forall a b.
a -> PageWithState state b -> PageWithState state a)
-> Functor (PageWithState state)
forall a b. a -> PageWithState state b -> PageWithState state a
forall a b.
(a -> b) -> PageWithState state a -> PageWithState state b
forall state a b.
a -> PageWithState state b -> PageWithState state a
forall state a b.
(a -> b) -> PageWithState state a -> PageWithState state b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall state a b.
(a -> b) -> PageWithState state a -> PageWithState state b
fmap :: forall a b.
(a -> b) -> PageWithState state a -> PageWithState state b
$c<$ :: forall state a b.
a -> PageWithState state b -> PageWithState state a
<$ :: forall a b. a -> PageWithState state b -> PageWithState state a
Functor)
paginateWithState :: (MonadClient m, Tuple a, Tuple b, RunQ q) => q R a b -> QueryParams a -> RetrySettings -> m (PageWithState x b)
paginateWithState :: forall (m :: * -> *) a b (q :: * -> * -> * -> *) x.
(MonadClient m, Tuple a, Tuple b, RunQ q) =>
q R a b -> QueryParams a -> RetrySettings -> m (PageWithState x b)
paginateWithState q R a b
q QueryParams a
p RetrySettings
retrySettings = do
let p' :: QueryParams a
p' = QueryParams a
p {Protocol.pageSize = Protocol.pageSize p <|> Just 10000}
r <- q R a b -> QueryParams a -> m (HostResponse R a b)
forall (m :: * -> *) a b k.
(MonadClient m, Tuple a, Tuple b) =>
q k a b -> QueryParams a -> m (HostResponse k a b)
forall (q :: * -> * -> * -> *) (m :: * -> *) a b k.
(RunQ q, MonadClient m, Tuple a, Tuple b) =>
q k a b -> QueryParams a -> m (HostResponse k a b)
runQ q R a b
q QueryParams a
p'
retry retrySettings (getResult r) >>= \case
Protocol.RowsResult MetaData
m [b]
b ->
PageWithState x b -> m (PageWithState x b)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PageWithState x b -> m (PageWithState x b))
-> PageWithState x b -> m (PageWithState x b)
forall a b. (a -> b) -> a -> b
$ [b] -> Maybe (GeneralPaginationState x) -> PageWithState x b
forall state res.
[res]
-> Maybe (GeneralPaginationState state) -> PageWithState state res
PageWithState [b]
b (PagingState -> GeneralPaginationState x
forall a. PagingState -> GeneralPaginationState a
PaginationStateCassandra (PagingState -> GeneralPaginationState x)
-> Maybe PagingState -> Maybe (GeneralPaginationState x)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MetaData -> Maybe PagingState
pagingState MetaData
m)
Result R a b
_ -> ProtocolError -> m (PageWithState x b)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM (ProtocolError -> m (PageWithState x b))
-> ProtocolError -> m (PageWithState x b)
forall a b. (a -> b) -> a -> b
$ Host -> Response R a b -> ProtocolError
forall k a b. Host -> Response k a b -> ProtocolError
UnexpectedResponse (HostResponse R a b -> Host
forall k a b. HostResponse k a b -> Host
hrHost HostResponse R a b
r) (HostResponse R a b -> Response R a b
forall k a b. HostResponse k a b -> Response k a b
hrResponse HostResponse R a b
r)
paginateWithStateC :: forall m res state. (Monad m) => (Maybe (GeneralPaginationState state) -> m (PageWithState state res)) -> ConduitT () [res] m ()
paginateWithStateC :: forall (m :: * -> *) res state.
Monad m =>
(Maybe (GeneralPaginationState state)
-> m (PageWithState state res))
-> ConduitT () [res] m ()
paginateWithStateC Maybe (GeneralPaginationState state) -> m (PageWithState state res)
getPage = do
PageWithState state res -> ConduitT () [res] m ()
go (PageWithState state res -> ConduitT () [res] m ())
-> ConduitT () [res] m (PageWithState state res)
-> ConduitT () [res] m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (PageWithState state res)
-> ConduitT () [res] m (PageWithState state res)
forall (m :: * -> *) a. Monad m => m a -> ConduitT () [res] m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Maybe (GeneralPaginationState state) -> m (PageWithState state res)
getPage Maybe (GeneralPaginationState state)
forall a. Maybe a
Nothing)
where
go :: PageWithState state res -> ConduitT () [res] m ()
go :: PageWithState state res -> ConduitT () [res] m ()
go PageWithState state res
page = do
Bool -> ConduitT () [res] m () -> ConduitT () [res] m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([res] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null PageWithState state res
page.pwsResults) (ConduitT () [res] m () -> ConduitT () [res] m ())
-> ConduitT () [res] m () -> ConduitT () [res] m ()
forall a b. (a -> b) -> a -> b
$
[res] -> ConduitT () [res] m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (PageWithState state res
page.pwsResults)
Bool -> ConduitT () [res] m () -> ConduitT () [res] m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PageWithState state res -> Bool
forall a b. PageWithState a b -> Bool
pwsHasMore PageWithState state res
page) (ConduitT () [res] m () -> ConduitT () [res] m ())
-> ConduitT () [res] m () -> ConduitT () [res] m ()
forall a b. (a -> b) -> a -> b
$
PageWithState state res -> ConduitT () [res] m ()
go (PageWithState state res -> ConduitT () [res] m ())
-> ConduitT () [res] m (PageWithState state res)
-> ConduitT () [res] m ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (PageWithState state res)
-> ConduitT () [res] m (PageWithState state res)
forall (m :: * -> *) a. Monad m => m a -> ConduitT () [res] m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Maybe (GeneralPaginationState state) -> m (PageWithState state res)
getPage (Maybe (GeneralPaginationState state)
-> m (PageWithState state res))
-> Maybe (GeneralPaginationState state)
-> m (PageWithState state res)
forall a b. (a -> b) -> a -> b
$ PageWithState state res
page.pwsState)
paramsPagingState :: Consistency -> a -> Int32 -> Maybe Protocol.PagingState -> QueryParams a
paramsPagingState :: forall a.
Consistency -> a -> Int32 -> Maybe PagingState -> QueryParams a
paramsPagingState Consistency
c a
p Int32
n Maybe PagingState
state = Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
forall a.
Consistency
-> Bool
-> a
-> Maybe Int32
-> Maybe PagingState
-> Maybe SerialConsistency
-> Maybe Bool
-> QueryParams a
QueryParams Consistency
c Bool
False a
p (Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
n) Maybe PagingState
state Maybe SerialConsistency
forall a. Maybe a
Nothing Maybe Bool
forall a. Maybe a
Nothing
{-# INLINE paramsPagingState #-}
pwsHasMore :: PageWithState a b -> Bool
pwsHasMore :: forall a b. PageWithState a b -> Bool
pwsHasMore = Maybe (GeneralPaginationState a) -> Bool
forall a. Maybe a -> Bool
isJust (Maybe (GeneralPaginationState a) -> Bool)
-> (PageWithState a b -> Maybe (GeneralPaginationState a))
-> PageWithState a b
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PageWithState a b -> Maybe (GeneralPaginationState a)
forall state res.
PageWithState state res -> Maybe (GeneralPaginationState state)
pwsState