module Hasql.Pool
(
Pool,
acquire,
use,
release,
UsageError (..),
)
where
import Data.Text.Encoding qualified as Text
import Data.Text.Encoding.Error qualified as Text
import Data.UUID.V4 qualified as Uuid
import Hasql.Connection (Connection)
import Hasql.Connection qualified as Connection
import Hasql.Connection.Setting qualified as Connection.Setting
import Hasql.Pool.Config.Config qualified as Config
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import Hasql.Pool.SessionErrorDestructors qualified as ErrorsDestruction
import Hasql.Session qualified as Session
data Entry = Entry
{ Entry -> Connection
entryConnection :: Connection,
Entry -> Word64
entryCreationTimeNSec :: Word64,
Entry -> Word64
entryUseTimeNSec :: Word64,
Entry -> UUID
entryId :: UUID
}
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
maxLifetime Word64
now Entry {Word64
UUID
Connection
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryCreationTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxLifetime
entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
maxIdletime Word64
now Entry {Word64
UUID
Connection
entryConnection :: Entry -> Connection
entryCreationTimeNSec :: Entry -> Word64
entryUseTimeNSec :: Entry -> Word64
entryId :: Entry -> UUID
entryConnection :: Connection
entryCreationTimeNSec :: Word64
entryUseTimeNSec :: Word64
entryId :: UUID
..} =
Word64
now Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
> Word64
entryUseTimeNSec Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
maxIdletime
data Pool = Pool
{
Pool -> Int
poolSize :: Int,
Pool -> IO [Setting]
poolFetchConnectionSettings :: IO [Connection.Setting.Setting],
Pool -> Int
poolAcquisitionTimeout :: Int,
Pool -> Word64
poolMaxLifetime :: Word64,
Pool -> Word64
poolMaxIdletime :: Word64,
Pool -> TQueue Entry
poolConnectionQueue :: TQueue Entry,
Pool -> TVar Int
poolCapacity :: TVar Int,
Pool -> TVar (TVar Bool)
poolReuseVar :: TVar (TVar Bool),
Pool -> IORef ()
poolReaperRef :: IORef (),
Pool -> Observation -> IO ()
poolObserver :: Observation -> IO (),
Pool -> Session ()
poolInitSession :: Session.Session ()
}
acquire :: Config.Config -> IO Pool
acquire :: Config -> IO Pool
acquire Config
config = do
TQueue Entry
connectionQueue <- IO (TQueue Entry)
forall a. IO (TQueue a)
newTQueueIO
TVar Int
capVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO (Config -> Int
Config.size Config
config)
TVar (TVar Bool)
reuseVar <- TVar Bool -> IO (TVar (TVar Bool))
forall a. a -> IO (TVar a)
newTVarIO (TVar Bool -> IO (TVar (TVar Bool)))
-> IO (TVar Bool) -> IO (TVar (TVar Bool))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
IORef ()
reaperRef <- () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
ThreadId
managerTid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask (((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId)
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
1000000
Word64
now <- IO Word64
getMonotonicTimeNSec
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
connectionQueue
let ([Entry]
agedEntries, [Entry]
unagedEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
agingTimeoutNanos Word64
now) [Entry]
entries
([Entry]
idleEntries, [Entry]
liveEntries) = (Entry -> Bool) -> [Entry] -> ([Entry], [Entry])
forall a. (a -> Bool) -> [a] -> ([a], [a])
partition (Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
agingTimeoutNanos Word64
now) [Entry]
unagedEntries
(Entry -> STM ()) -> [Entry] -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
connectionQueue) [Entry]
liveEntries
return $ do
[Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
agedEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
(Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
[Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
idleEntries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
capVar Int -> Int
forall a. Enum a => a -> a
succ
(Config -> Observation -> IO ()
Config.observationHandler Config
config) (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))
IO (Weak (IORef ())) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef ())) -> IO ())
-> (IO () -> IO (Weak (IORef ()))) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
reaperRef (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> IO ()
killThread ThreadId
managerTid
return $ Int
-> IO [Setting]
-> Int
-> Word64
-> Word64
-> TQueue Entry
-> TVar Int
-> TVar (TVar Bool)
-> IORef ()
-> (Observation -> IO ())
-> Session ()
-> Pool
Pool (Config -> Int
Config.size Config
config) (Config -> IO [Setting]
Config.connectionSettingsProvider Config
config) Int
acqTimeoutMicros Word64
agingTimeoutNanos Word64
maxIdletimeNanos TQueue Entry
connectionQueue TVar Int
capVar TVar (TVar Bool)
reuseVar IORef ()
reaperRef (Config -> Observation -> IO ()
Config.observationHandler Config
config) (Config -> Session ()
Config.initSession Config
config)
where
acqTimeoutMicros :: Int
acqTimeoutMicros =
Int -> Int -> Int
forall a. Integral a => a -> a -> a
div (Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.acquisitionTimeout Config
config))) Int
1_000_000
agingTimeoutNanos :: Word64
agingTimeoutNanos =
Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.agingTimeout Config
config))) Word64
1_000
maxIdletimeNanos :: Word64
maxIdletimeNanos =
Word64 -> Word64 -> Word64
forall a. Integral a => a -> a -> a
div (Integer -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (DiffTime -> Integer
diffTimeToPicoseconds (Config -> DiffTime
Config.idlenessTimeout Config
config))) Word64
1_000
release :: Pool -> IO ()
release :: Pool -> IO ()
release Pool {Int
IO [Setting]
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Session ()
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO [Setting]
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolInitSession :: Pool -> Session ()
poolSize :: Int
poolFetchConnectionSettings :: IO [Setting]
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
poolInitSession :: Session ()
..} =
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
prevReuse <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
prevReuse Bool
False
TVar Bool
newReuse <- Bool -> STM (TVar Bool)
forall a. a -> STM (TVar a)
newTVar Bool
True
TVar (TVar Bool) -> TVar Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (TVar Bool)
poolReuseVar TVar Bool
newReuse
[Entry]
entries <- TQueue Entry -> STM [Entry]
forall a. TQueue a -> STM [a]
flushTQueue TQueue Entry
poolConnectionQueue
return $ [Entry] -> (Entry -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Entry]
entries ((Entry -> IO ()) -> IO ()) -> (Entry -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Entry
entry -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use :: forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool {Int
IO [Setting]
Word64
TVar Int
TVar (TVar Bool)
IORef ()
TQueue Entry
Session ()
Observation -> IO ()
poolSize :: Pool -> Int
poolFetchConnectionSettings :: Pool -> IO [Setting]
poolAcquisitionTimeout :: Pool -> Int
poolMaxLifetime :: Pool -> Word64
poolMaxIdletime :: Pool -> Word64
poolConnectionQueue :: Pool -> TQueue Entry
poolCapacity :: Pool -> TVar Int
poolReuseVar :: Pool -> TVar (TVar Bool)
poolReaperRef :: Pool -> IORef ()
poolObserver :: Pool -> Observation -> IO ()
poolInitSession :: Pool -> Session ()
poolSize :: Int
poolFetchConnectionSettings :: IO [Setting]
poolAcquisitionTimeout :: Int
poolMaxLifetime :: Word64
poolMaxIdletime :: Word64
poolConnectionQueue :: TQueue Entry
poolCapacity :: TVar Int
poolReuseVar :: TVar (TVar Bool)
poolReaperRef :: IORef ()
poolObserver :: Observation -> IO ()
poolInitSession :: Session ()
..} Session a
sess = do
STM Bool
timeout <- do
TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay Int
poolAcquisitionTimeout
return $ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
IO (IO (Either UsageError a)) -> IO (Either UsageError a)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO (Either UsageError a)) -> IO (Either UsageError a))
-> (STM (IO (Either UsageError a))
-> IO (IO (Either UsageError a)))
-> STM (IO (Either UsageError a))
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO (Either UsageError a)) -> IO (IO (Either UsageError a))
forall a. STM a -> IO a
atomically (STM (IO (Either UsageError a)) -> IO (Either UsageError a))
-> STM (IO (Either UsageError a)) -> IO (Either UsageError a)
forall a b. (a -> b) -> a -> b
$ do
TVar Bool
reuseVar <- TVar (TVar Bool) -> STM (TVar Bool)
forall a. TVar a -> STM a
readTVar TVar (TVar Bool)
poolReuseVar
[STM (IO (Either UsageError a))] -> STM (IO (Either UsageError a))
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ TQueue Entry -> STM Entry
forall a. TQueue a -> STM a
readTQueue TQueue Entry
poolConnectionQueue STM Entry
-> (Entry -> IO (Either UsageError a))
-> STM (IO (Either UsageError a))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar,
do
Int
capVal <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
poolCapacity
if Int
capVal Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
poolCapacity (Int -> STM ()) -> Int -> STM ()
forall a b. (a -> b) -> a -> b
$! Int -> Int
forall a. Enum a => a -> a
pred Int
capVal
return $ TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else STM (IO (Either UsageError a))
forall a. STM a
retry,
do
Bool
timedOut <- STM Bool
timeout
if Bool
timedOut
then IO (Either UsageError a) -> STM (IO (Either UsageError a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Either UsageError a) -> STM (IO (Either UsageError a)))
-> (UsageError -> IO (Either UsageError a))
-> UsageError
-> STM (IO (Either UsageError a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Either UsageError a -> IO (Either UsageError a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either UsageError a -> IO (Either UsageError a))
-> (UsageError -> Either UsageError a)
-> UsageError
-> IO (Either UsageError a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> STM (IO (Either UsageError a)))
-> UsageError -> STM (IO (Either UsageError a))
forall a b. (a -> b) -> a -> b
$ UsageError
AcquisitionTimeoutUsageError
else STM (IO (Either UsageError a))
forall a. STM a
retry
]
where
onNewConn :: TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar = do
[Setting]
settings <- IO [Setting]
poolFetchConnectionSettings
Word64
now <- IO Word64
getMonotonicTimeNSec
UUID
id <- IO UUID
Uuid.nextRandom
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id ConnectionStatus
ConnectingConnectionStatus)
[Setting] -> IO (Either ConnectionError Connection)
Connection.acquire [Setting]
settings IO (Either ConnectionError Connection)
-> (Either ConnectionError Connection -> IO (Either UsageError a))
-> IO (Either UsageError a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left ConnectionError
connErr -> do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((ByteString -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> ByteString -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
connErr))))
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
return $ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ ConnectionError -> UsageError
ConnectionUsageError ConnectionError
connErr
Right Connection
connection -> do
Session () -> Connection -> IO (Either SessionError ())
forall a. Session a -> Connection -> IO (Either SessionError a)
Session.run Session ()
poolInitSession Connection
connection IO (Either SessionError ())
-> (Either SessionError () -> IO (Either UsageError a))
-> IO (Either UsageError a)
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SessionError
err -> do
Connection -> IO ()
Connection.release Connection
connection
(ConnectionError -> IO ()) -> IO () -> SessionError -> IO ()
forall x. (ConnectionError -> x) -> x -> SessionError -> x
ErrorsDestruction.reset
( \ConnectionError
details -> do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((ByteString -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> ByteString -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
details))))
)
(Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (SessionError -> ConnectionTerminationReason
InitializationErrorTerminationReason SessionError
err))))
SessionError
err
return $ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ SessionError -> UsageError
SessionUsageError SessionError
err
Right () -> do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation UUID
id (ConnectionReadyForUseReason -> ConnectionStatus
ReadyForUseConnectionStatus ConnectionReadyForUseReason
EstablishedConnectionReadyForUseReason))
TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar (Connection -> Word64 -> Word64 -> UUID -> Entry
Entry Connection
connection Word64
now Word64
now UUID
id)
onConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onConn TVar Bool
reuseVar Entry
entry = do
Word64
now <- IO Word64
getMonotonicTimeNSec
if Word64 -> Word64 -> Entry -> Bool
entryIsAged Word64
poolMaxLifetime Word64
now Entry
entry
then do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
AgingConnectionTerminationReason))
TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else
if Word64 -> Word64 -> Entry -> Bool
entryIsIdle Word64
poolMaxIdletime Word64
now Entry
entry
then do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
IdlenessConnectionTerminationReason))
TVar Bool -> IO (Either UsageError a)
onNewConn TVar Bool
reuseVar
else do
TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry {entryUseTimeNSec = now}
onLiveConn :: TVar Bool -> Entry -> IO (Either UsageError a)
onLiveConn TVar Bool
reuseVar Entry
entry = do
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) ConnectionStatus
InUseConnectionStatus)
Either SomeException (Either SessionError a)
sessRes <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException (Session a -> Connection -> IO (Either SessionError a)
forall a. Session a -> Connection -> IO (Either SessionError a)
Session.run Session a
sess (Entry -> Connection
entryConnection Entry
entry))
case Either SomeException (Either SessionError a)
sessRes of
Left SomeException
exc -> do
IO ()
returnConn
SomeException -> IO (Either UsageError a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
Right (Left SessionError
err) ->
(ConnectionError -> IO (Either UsageError a))
-> IO (Either UsageError a)
-> SessionError
-> IO (Either UsageError a)
forall x. (ConnectionError -> x) -> x -> SessionError -> x
ErrorsDestruction.reset
( \ConnectionError
details -> do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus (Maybe Text -> ConnectionTerminationReason
NetworkErrorConnectionTerminationReason ((ByteString -> Text) -> ConnectionError -> Maybe Text
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (OnDecodeError -> ByteString -> Text
Text.decodeUtf8With OnDecodeError
Text.lenientDecode) ConnectionError
details))))
return $ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ SessionError -> UsageError
SessionUsageError SessionError
err
)
( do
IO ()
returnConn
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionReadyForUseReason -> ConnectionStatus
ReadyForUseConnectionStatus (SessionError -> ConnectionReadyForUseReason
SessionFailedConnectionReadyForUseReason SessionError
err)))
return $ UsageError -> Either UsageError a
forall a b. a -> Either a b
Left (UsageError -> Either UsageError a)
-> UsageError -> Either UsageError a
forall a b. (a -> b) -> a -> b
$ SessionError -> UsageError
SessionUsageError SessionError
err
)
SessionError
err
Right (Right a
res) -> do
IO ()
returnConn
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionReadyForUseReason -> ConnectionStatus
ReadyForUseConnectionStatus ConnectionReadyForUseReason
SessionSucceededConnectionReadyForUseReason))
return $ a -> Either UsageError a
forall a b. b -> Either a b
Right a
res
where
returnConn :: IO ()
returnConn =
IO (IO ()) -> IO ()
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ()) -> IO ())
-> (STM (IO ()) -> IO (IO ())) -> STM (IO ()) -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. STM (IO ()) -> IO (IO ())
forall a. STM a -> IO a
atomically (STM (IO ()) -> IO ()) -> STM (IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
reuse <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
reuseVar
if Bool
reuse
then TQueue Entry -> Entry -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue Entry
poolConnectionQueue Entry
entry STM () -> IO () -> STM (IO ())
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else IO () -> STM (IO ())
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO () -> STM (IO ())) -> IO () -> STM (IO ())
forall a b. (a -> b) -> a -> b
$ do
Connection -> IO ()
Connection.release (Entry -> Connection
entryConnection Entry
entry)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
poolCapacity Int -> Int
forall a. Enum a => a -> a
succ
Observation -> IO ()
poolObserver (UUID -> ConnectionStatus -> Observation
ConnectionObservation (Entry -> UUID
entryId Entry
entry) (ConnectionTerminationReason -> ConnectionStatus
TerminatedConnectionStatus ConnectionTerminationReason
ReleaseConnectionTerminationReason))
data UsageError
=
ConnectionUsageError Connection.ConnectionError
|
SessionUsageError Session.SessionError
|
AcquisitionTimeoutUsageError
deriving (Int -> UsageError -> ShowS
[UsageError] -> ShowS
UsageError -> String
(Int -> UsageError -> ShowS)
-> (UsageError -> String)
-> ([UsageError] -> ShowS)
-> Show UsageError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> UsageError -> ShowS
showsPrec :: Int -> UsageError -> ShowS
$cshow :: UsageError -> String
show :: UsageError -> String
$cshowList :: [UsageError] -> ShowS
showList :: [UsageError] -> ShowS
Show, UsageError -> UsageError -> Bool
(UsageError -> UsageError -> Bool)
-> (UsageError -> UsageError -> Bool) -> Eq UsageError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: UsageError -> UsageError -> Bool
== :: UsageError -> UsageError -> Bool
$c/= :: UsageError -> UsageError -> Bool
/= :: UsageError -> UsageError -> Bool
Eq)
instance Exception UsageError