{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.STM

import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { ClientConfig -> Scheme
scheme :: Scheme
    -- ^ https or http
    , ClientConfig -> Authority
authority :: Authority
    -- ^ Server name
    , ClientConfig -> StreamId
cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , ClientConfig -> StreamId
connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , ClientConfig -> Settings
settings :: Settings
    -- ^ Settings
    }
    deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, StreamId -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(StreamId -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(StreamId -> a -> ShowS)
-> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> ClientConfig -> ShowS
showsPrec :: StreamId -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 1048576, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing, pingRateLimit = 10}}
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme :: Scheme
scheme = Scheme
"http"
        , authority :: Authority
authority = Authority
"localhost"
        , cacheLimit :: StreamId
cacheLimit = StreamId
64
        , connectionWindowSize :: StreamId
connectionWindowSize = StreamId
defaultMaxData
        , settings :: Settings
settings = Settings
defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} Config
conf Client a
client = do
    (Context
ctx, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
    Config -> Context -> Manager -> IO a -> IO a
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Manager -> IO a
runClient Context
ctx Manager
mgr
  where
    serverMaxStreams :: Context -> IO StreamId
serverMaxStreams Context
ctx = do
        Maybe StreamId
mx <- Settings -> Maybe StreamId
maxConcurrentStreams (Settings -> Maybe StreamId) -> IO Settings -> IO (Maybe StreamId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
        case Maybe StreamId
mx of
            Maybe StreamId
Nothing -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
forall a. Bounded a => a
maxBound
            Just StreamId
x -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
x
    possibleClientStream :: Context -> IO StreamId
possibleClientStream Context
ctx = do
        StreamId
x <- Context -> IO StreamId
serverMaxStreams Context
ctx
        StreamId
n <- OddStreamTable -> StreamId
oddConc (OddStreamTable -> StreamId) -> IO OddStreamTable -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar OddStreamTable -> IO OddStreamTable
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
        StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
x StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
n)
    aux :: Context -> Aux
aux Context
ctx =
        Aux
            { auxPossibleClientStreams :: IO StreamId
auxPossibleClientStreams = Context -> IO StreamId
possibleClientStream Context
ctx
            }
    clientCore :: Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr Request
req Response -> IO b
processResponse = do
        Stream
strm <- Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Authority
authority Request
req
        Response
rsp <- Stream -> IO Response
getResponse Stream
strm
        Response -> IO b
processResponse Response
rsp
    runClient :: Context -> Manager -> IO a
runClient Context
ctx Manager
mgr = do
        a
x <- Client a
client (Context -> Manager -> Request -> (Response -> IO r) -> IO r
forall {b}.
Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
        Manager -> IO ()
waitCounter0 Manager
mgr
        let frame :: Scheme
frame = StreamId -> ErrorCode -> Scheme -> Scheme
goawayFrame StreamId
0 ErrorCode
NoError Scheme
"graceful closing"
        MVar ()
mvar <- () -> IO (MVar ())
forall (m :: * -> *) a. MonadIO m => a -> m (MVar a)
newMVar ()
        TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Scheme -> MVar () -> Control
CGoaway Scheme
frame MVar ()
mvar
        MVar () -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
mvar
        a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ClientIO -> IO (IO a)
action = do
    (ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..}, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
    let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
        putR :: Request -> IO (StreamId, Stream)
putR Request
req = do
            Stream
strm <- Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Authority
authority Request
req
            (StreamId, Stream) -> IO (StreamId, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> StreamId
streamNumber Stream
strm, Stream
strm)
        get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
        create :: IO (StreamId, Stream)
create = Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
    IO a
runClient <-
        ClientIO -> IO (IO a)
action (ClientIO -> IO (IO a)) -> ClientIO -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (StreamId, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (StreamId, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (StreamId, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (StreamId, Stream)
create
    Config -> Context -> Manager -> IO a -> IO a
forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient

getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
    Either SomeException InpObj
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar (MVar (Either SomeException InpObj)
 -> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
    case Either SomeException InpObj
mRsp of
        Left SomeException
err -> SomeException -> IO Response
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
        Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

setup :: ClientConfig -> Config -> IO (Context, Manager)
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
    let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
    Context
ctx <-
        RoleInfo
-> Config -> StreamId -> StreamId -> Settings -> IO Context
newContext
            RoleInfo
clientInfo
            Config
conf
            StreamId
cacheLimit
            StreamId
connectionWindowSize
            Settings
settings
    Manager
mgr <- Manager -> IO Manager
start Manager
confTimeoutManager
    Context -> IO ()
exchangeSettings Context
ctx
    (Context, Manager) -> IO (Context, Manager)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Context
ctx, Manager
mgr)

runH2 :: Config -> Context -> Manager -> IO a -> IO a
runH2 :: forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient =
    Manager
-> IO (Either () a)
-> (Either SomeException (Either () a) -> IO a)
-> IO a
forall a b.
Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter Manager
mgr (IO () -> IO a -> IO (Either () a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient) ((Either SomeException (Either () a) -> IO a) -> IO a)
-> (Either SomeException (Either () a) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Either SomeException (Either () a)
res -> do
        TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) (Maybe SomeException -> IO ()) -> Maybe SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$
            (SomeException -> Maybe SomeException)
-> (Either () a -> Maybe SomeException)
-> Either SomeException (Either () a)
-> Maybe SomeException
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just (Maybe SomeException -> Either () a -> Maybe SomeException
forall a b. a -> b -> a
const Maybe SomeException
forall a. Maybe a
Nothing) Either SomeException (Either () a)
res
        case Either SomeException (Either () a)
res of
            Left SomeException
err ->
                SomeException -> IO a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
            Right (Left ()) ->
                IO a
forall a. HasCallStack => a
undefined -- never reach
            Right (Right a
x) ->
                a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
  where
    runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
    runSender :: IO ()
runSender = Context -> Config -> Manager -> IO ()
frameSender Context
ctx Config
conf Manager
mgr
    runBackgroundThreads :: IO ()
runBackgroundThreads = IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender

sendRequest
    :: Context
    -> Manager
    -> Scheme
    -> Authority
    -> Request
    -> IO Stream
sendRequest :: Context -> Manager -> Scheme -> Authority -> Request -> IO Stream
sendRequest ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Scheme
scheme Authority
auth (Request OutObj
req) = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
    case Maybe Stream
mstrm0 of
        Just Stream
strm0 -> do
            TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
        Maybe Stream
Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            let hdr1, hdr2 :: [Header]
                hdr1 :: [Header]
hdr1
                    | Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
                    | Bool
otherwise = [Header]
hdr0
                hdr2 :: [Header]
hdr2
                    | Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Authority -> Scheme
UTF8.fromString Authority
auth) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
                    | Bool
otherwise = [Header]
hdr1
                req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (StreamId
sid, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
            case OutObj -> OutBody
outObjBody OutObj
req of
                OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
                    Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm ((OutBodyIface -> IO ()) -> IO ())
-> (OutBodyIface -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
                        OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
                OutBodyStreamingUnmask OutBodyIface -> IO ()
strmbdy ->
                    Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm OutBodyIface -> IO ()
strmbdy
                OutBody
_ -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    StreamId
sidOK <- TVar StreamId -> STM StreamId
forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
                    Bool -> STM ()
check (StreamId
sidOK StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
sid)
                    TVar StreamId -> StreamId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
2)
                    TQueue (Output Stream) -> Output Stream -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ (Output Stream -> STM ()) -> Output Stream -> STM ()
forall a b. (a -> b) -> a -> b
$ Stream
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output Stream
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm

sendStreaming
    :: Context
    -> Manager
    -> OutObj
    -> StreamId
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO ()
sendStreaming :: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> (OutBodyIface -> IO ())
-> IO ()
sendStreaming Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr OutObj
req StreamId
sid Stream
newstrm OutBodyIface -> IO ()
strmbdy = do
    TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    Manager -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr (((forall x. IO x -> IO x) -> IO ()) -> IO ())
-> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask -> do
        IORef Bool
decrementedCounter <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
        let decCounterOnce :: IO ()
decCounterOnce = do
              Bool
alreadyDecremented <- IORef Bool -> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef Bool
decrementedCounter ((Bool -> (Bool, Bool)) -> IO Bool)
-> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \Bool
b -> (Bool
True, Bool
b)
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
alreadyDecremented (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Manager -> IO ()
decCounter Manager
mgr
        let iface :: OutBodyIface
iface = OutBodyIface {
                outBodyUnmask :: forall x. IO x -> IO x
outBodyUnmask = IO x -> IO x
forall x. IO x -> IO x
unmask
              , outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> Maybe (IO ()) -> StreamingChunk
StreamingBuilder Builder
b Maybe (IO ())
forall a. Maybe a
Nothing)
              , outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b -> STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> Maybe (IO ()) -> StreamingChunk
StreamingBuilder Builder
b (IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just IO ()
decCounterOnce))
              , outBodyFlush :: IO ()
outBodyFlush = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
              }
            finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished IO ()
decCounterOnce
        Manager -> IO ()
incCounter Manager
mgr
        OutBodyIface -> IO ()
strmbdy OutBodyIface
iface IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` IO ()
finished
    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        StreamId
sidOK <- TVar StreamId -> STM StreamId
forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
        Bool -> STM ()
check (StreamId
sidOK StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
sid)
        TVar StreamId -> StreamId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
2)
        TQueue (Output Stream) -> Output Stream -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ (Output Stream -> STM ()) -> Output Stream -> STM ()
forall a b. (a -> b) -> a -> b
$ Stream
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output Stream
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req OutputType
OObj (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
controlQ :: Context -> TQueue Control
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} = do
    StreamId
connRxWS <- RxFlow -> StreamId
rxfBufSize (RxFlow -> StreamId) -> IO RxFlow -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
    let frames :: [Scheme]
frames = Settings -> StreamId -> [Scheme]
makeNegotiationFrames Settings
mySettings StreamId
connRxWS
        setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
    IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
    TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe

data ClientIO = ClientIO
    { ClientIO -> SockAddr
cioMySockAddr :: SockAddr
    , ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
    , ClientIO -> Request -> IO (StreamId, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
    , ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
    , ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
    , ClientIO -> IO (StreamId, Stream)
cioCreateStream :: IO (StreamId, Stream)
    }