{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Client.Run where
import Control.Concurrent.STM (check)
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.STM
import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2
data ClientConfig = ClientConfig
{ ClientConfig -> Scheme
scheme :: Scheme
, ClientConfig -> Authority
authority :: Authority
, ClientConfig -> StreamId
cacheLimit :: Int
, ClientConfig -> StreamId
connectionWindowSize :: WindowSize
, ClientConfig -> Settings
settings :: Settings
}
deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, StreamId -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(StreamId -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(StreamId -> a -> ShowS)
-> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> ClientConfig -> ShowS
showsPrec :: StreamId -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
ClientConfig
{ scheme :: Scheme
scheme = Scheme
"http"
, authority :: Authority
authority = Authority
"localhost"
, cacheLimit :: StreamId
cacheLimit = StreamId
64
, connectionWindowSize :: StreamId
connectionWindowSize = StreamId
defaultMaxData
, settings :: Settings
settings = Settings
defaultSettings
}
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} Config
conf Client a
client = do
(Context
ctx, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
Config -> Context -> Manager -> IO a -> IO a
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Manager -> IO a
runClient Context
ctx Manager
mgr
where
serverMaxStreams :: Context -> IO StreamId
serverMaxStreams Context
ctx = do
Maybe StreamId
mx <- Settings -> Maybe StreamId
maxConcurrentStreams (Settings -> Maybe StreamId) -> IO Settings -> IO (Maybe StreamId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
case Maybe StreamId
mx of
Maybe StreamId
Nothing -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
forall a. Bounded a => a
maxBound
Just StreamId
x -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
x
possibleClientStream :: Context -> IO StreamId
possibleClientStream Context
ctx = do
StreamId
x <- Context -> IO StreamId
serverMaxStreams Context
ctx
StreamId
n <- OddStreamTable -> StreamId
oddConc (OddStreamTable -> StreamId) -> IO OddStreamTable -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar OddStreamTable -> IO OddStreamTable
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
x StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
n)
aux :: Context -> Aux
aux Context
ctx =
Aux
{ auxPossibleClientStreams :: IO StreamId
auxPossibleClientStreams = Context -> IO StreamId
possibleClientStream Context
ctx
}
clientCore :: Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr Request
req Response -> IO b
processResponse = do
Stream
strm <- Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Authority
authority Request
req
Response
rsp <- Stream -> IO Response
getResponse Stream
strm
Response -> IO b
processResponse Response
rsp
runClient :: Context -> Manager -> IO a
runClient Context
ctx Manager
mgr = do
a
x <- Client a
client (Context -> Manager -> Request -> (Response -> IO r) -> IO r
forall {b}.
Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
Manager -> IO ()
waitCounter0 Manager
mgr
let frame :: Scheme
frame = StreamId -> ErrorCode -> Scheme -> Scheme
goawayFrame StreamId
0 ErrorCode
NoError Scheme
"graceful closing"
MVar ()
mvar <- () -> IO (MVar ())
forall (m :: * -> *) a. MonadIO m => a -> m (MVar a)
newMVar ()
TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Scheme -> MVar () -> Control
CGoaway Scheme
frame MVar ()
mvar
MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
mvar
a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ClientIO -> IO (IO a)
action = do
(ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..}, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
putR :: Request -> IO (StreamId, Stream)
putR Request
req = do
Stream
strm <- Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Authority
authority Request
req
(StreamId, Stream) -> IO (StreamId, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> StreamId
streamNumber Stream
strm, Stream
strm)
get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
create :: IO (StreamId, Stream)
create = Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
IO a
runClient <-
ClientIO -> IO (IO a)
action (ClientIO -> IO (IO a)) -> ClientIO -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (StreamId, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (StreamId, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (StreamId, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (StreamId, Stream)
create
Config -> Context -> Manager -> IO a -> IO a
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient
getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
Either SomeException InpObj
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar (MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
case Either SomeException InpObj
mRsp of
Left SomeException
err -> SomeException -> IO Response
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
Context
ctx <-
RoleInfo
-> Config -> StreamId -> StreamId -> Settings -> IO Context
newContext
RoleInfo
clientInfo
Config
conf
StreamId
cacheLimit
StreamId
connectionWindowSize
Settings
settings
Manager
mgr <- Manager -> IO Manager
start Manager
confTimeoutManager
Context -> IO ()
exchangeSettings Context
ctx
(Context, Manager) -> IO (Context, Manager)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Context
ctx, Manager
mgr)
runH2 :: Config -> Context -> Manager -> IO a -> IO a
runH2 :: forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient =
Manager
-> IO (Either () a)
-> (Either SomeException (Either () a) -> IO a)
-> IO a
forall a b.
Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter Manager
mgr (IO () -> IO a -> IO (Either () a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient) ((Either SomeException (Either () a) -> IO a) -> IO a)
-> (Either SomeException (Either () a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Either SomeException (Either () a)
res -> do
TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) (Maybe SomeException -> IO ()) -> Maybe SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$
(SomeException -> Maybe SomeException)
-> (Either () a -> Maybe SomeException)
-> Either SomeException (Either () a)
-> Maybe SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (Maybe SomeException -> Either () a -> Maybe SomeException
forall a b. a -> b -> a
const Maybe SomeException
forall a. Maybe a
Nothing) Either SomeException (Either () a)
res
case Either SomeException (Either () a)
res of
Left SomeException
err ->
SomeException -> IO a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
Right (Left ()) ->
IO a
forall a. HasCallStack => a
undefined
Right (Right a
x) ->
a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
where
runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
runSender :: IO ()
runSender = Context -> Config -> Manager -> IO ()
frameSender Context
ctx Config
conf Manager
mgr
runBackgroundThreads :: IO ()
runBackgroundThreads = IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender
sendRequest
:: Context
-> Manager
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest :: Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Scheme
scheme Authority
auth (Request OutObj
req) = do
let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
case Maybe Stream
mstrm0 of
Just Stream
strm0 -> do
TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
Maybe Stream
Nothing -> do
let hdr1, hdr2 :: [Header]
hdr1 :: [Header]
hdr1
| Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
| Bool
otherwise = [Header]
hdr0
hdr2 :: [Header]
hdr2
| Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Authority -> Scheme
UTF8.fromString Authority
auth) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
| Bool
otherwise = [Header]
hdr1
req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
(StreamId
sid, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
case OutObj -> OutBody
outObjBody OutObj
req of
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm ((OutBodyIface -> IO ()) -> IO ())
-> (OutBodyIface -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
OutBodyStreamingUnmask OutBodyIface -> IO ()
strmbdy ->
Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm OutBodyIface -> IO ()
strmbdy
OutBody
_ -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StreamId
sidOK <- TVar StreamId -> STM StreamId
forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
Bool -> STM ()
check (StreamId
sidOK StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
sid)
TVar StreamId -> StreamId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
2)
TQueue (Output Stream) -> Output Stream -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ (Output Stream -> STM ()) -> Output Stream -> STM ()
forall a b. (a -> b) -> a -> b
$ Stream
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output Stream
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm
sendStreaming
:: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming :: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr OutObj
req StreamId
sid Stream
newstrm OutBodyIface -> IO ()
strmbdy = do
TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10
Manager -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr (((forall x. IO x -> IO x) -> IO ()) -> IO ())
-> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask -> do
IORef Bool
decrementedCounter <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
let decCounterOnce :: IO ()
decCounterOnce = do
Bool
alreadyDecremented <- IORef Bool -> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Bool
decrementedCounter ((Bool -> (Bool, Bool)) -> IO Bool)
-> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Bool
b -> (Bool
True, Bool
b)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyDecremented (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Manager -> IO ()
decCounter Manager
mgr
let iface :: OutBodyIface
iface = OutBodyIface {
outBodyUnmask :: forall x. IO x -> IO x
outBodyUnmask = IO x -> IO x
forall x. IO x -> IO x
unmask
, outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> Maybe (IO ()) -> StreamingChunk
StreamingBuilder Builder
b Maybe (IO ())
forall a. Maybe a
Nothing)
, outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> Maybe (IO ()) -> StreamingChunk
StreamingBuilder Builder
b (IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just IO ()
decCounterOnce))
, outBodyFlush :: IO ()
outBodyFlush = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
}
finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished IO ()
decCounterOnce
Manager -> IO ()
incCounter Manager
mgr
OutBodyIface -> IO ()
strmbdy OutBodyIface
iface IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` IO ()
finished
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StreamId
sidOK <- TVar StreamId -> STM StreamId
forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
Bool -> STM ()
check (StreamId
sidOK StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
sid)
TVar StreamId -> StreamId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
2)
TQueue (Output Stream) -> Output Stream -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ (Output Stream -> STM ()) -> Output Stream -> STM ()
forall a b. (a -> b) -> a -> b
$ Stream
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output Stream
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req OutputType
OObj (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} = do
StreamId
connRxWS <- RxFlow -> StreamId
rxfBufSize (RxFlow -> StreamId) -> IO RxFlow -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
let frames :: [Scheme]
frames = Settings -> StreamId -> [Scheme]
makeNegotiationFrames Settings
mySettings StreamId
connRxWS
setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe
data ClientIO = ClientIO
{ ClientIO -> SockAddr
cioMySockAddr :: SockAddr
, ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
, ClientIO -> Request -> IO (StreamId, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
, ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
, ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
, ClientIO -> IO (StreamId, Stream)
cioCreateStream :: IO (StreamId, Stream)
}