{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker,
    WorkerConf (..),
    fromContext,
) where

import Data.IORef
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.HTTP.Types
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports hiding (insert)
import Network.HTTP2.Frame
import Network.HTTP2.H2

----------------------------------------------------------------

data WorkerConf a = WorkerConf
    { forall a. WorkerConf a -> IO (Input a)
readInputQ :: IO (Input a)
    , forall a. WorkerConf a -> Output a -> IO ()
writeOutputQ :: Output a -> IO ()
    , forall a. WorkerConf a -> a -> IO ()
workerCleanup :: a -> IO ()
    , forall a. WorkerConf a -> IO Bool
isPushable :: IO Bool
    , forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, a)
    , forall a. WorkerConf a -> SockAddr
mySockAddr :: SockAddr
    , forall a. WorkerConf a -> SockAddr
peerSockAddr :: SockAddr
    }

fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue (Output Stream)
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue (Output Stream)
outputQStreamID :: Context -> TVar StreamId
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..} =
    WorkerConf
        { readInputQ :: IO (Input Stream)
readInputQ = STM (Input Stream) -> IO (Input Stream)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Input Stream) -> IO (Input Stream))
-> STM (Input Stream) -> IO (Input Stream)
forall a b. (a -> b) -> a -> b
$ TQueue (Input Stream) -> STM (Input Stream)
forall a. TQueue a -> STM a
readTQueue (TQueue (Input Stream) -> STM (Input Stream))
-> TQueue (Input Stream) -> STM (Input Stream)
forall a b. (a -> b) -> a -> b
$ ServerInfo -> TQueue (Input Stream)
inputQ (ServerInfo -> TQueue (Input Stream))
-> ServerInfo -> TQueue (Input Stream)
forall a b. (a -> b) -> a -> b
$ RoleInfo -> ServerInfo
toServerInfo RoleInfo
roleInfo
        , writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
        , workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
            Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
            let frame :: ByteString
frame = ErrorCode -> StreamId -> ByteString
resetFrame ErrorCode
InternalError (StreamId -> ByteString) -> StreamId -> ByteString
forall a b. (a -> b) -> a -> b
$ Stream -> StreamId
streamNumber Stream
strm
            TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [ByteString] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [ByteString
frame]
        , -- Peer SETTINGS_ENABLE_PUSH
          isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
        , -- Peer SETTINGS_INITIAL_WINDOW_SIZE
          makePushStream :: Stream -> PushPromise -> IO (StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (StreamId
_, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openEvenStreamWait Context
ctx
            let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
            (StreamId, Stream) -> IO (StreamId, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, Stream
newstrm)
        , mySockAddr :: SockAddr
mySockAddr = SockAddr
mySockAddr
        , peerSockAddr :: SockAddr
peerSockAddr = SockAddr
peerSockAddr
        }

----------------------------------------------------------------

pushStream
    :: WorkerConf a
    -> a -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO OutputType
pushStream :: forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
    | StreamId
len StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 = OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
    | Bool
otherwise = do
        Bool
pushable <- IO Bool
isPushable
        if Bool
pushable
            then do
                TVar StreamId
tvar <- StreamId -> IO (TVar StreamId)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO StreamId
0
                StreamId
lim <- TVar StreamId -> [PushPromise] -> StreamId -> IO StreamId
forall {a}.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
                if StreamId
lim StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0
                    then OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
                    else OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (StreamId -> TVar StreamId -> IO ()
forall {m :: * -> *} {a}. (MonadIO m, Ord a) => a -> TVar a -> m ()
waiter StreamId
lim TVar StreamId
tvar)
            else OutputType -> IO OutputType
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  where
    len :: StreamId
len = [PushPromise] -> StreamId
forall a. [a] -> StreamId
forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
    increment :: TVar a -> m ()
increment TVar a
tvar = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
    waiter :: a -> TVar a -> m ()
waiter a
lim TVar a
tvar = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
checkSTM (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
    push TVar a
tvar (PushPromise
pp : [PushPromise]
pps) StreamId
n = do
        (StreamId
pid, a
newstrm) <- a -> PushPromise -> IO (StreamId, a)
makePushStream a
pstrm PushPromise
pp
        let scheme :: ByteString
scheme = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenScheme ValueTable
reqvt
            -- fixme: this value can be Nothing
            auth :: ByteString
auth =
                Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust
                    ( Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenAuthority ValueTable
reqvt
                        Maybe ByteString -> Maybe ByteString -> Maybe ByteString
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenHost ValueTable
reqvt
                    )
            path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
            promiseRequest :: [(Token, ByteString)]
promiseRequest =
                [ (Token
tokenMethod, ByteString
methodGet)
                , (Token
tokenScheme, ByteString
scheme)
                , (Token
tokenAuthority, ByteString
auth)
                , (Token
tokenPath, ByteString
path)
                ]
            ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
            Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
            out :: Output a
out = a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (IO () -> Output a) -> IO () -> Output a
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall {m :: * -> *} {a}. (MonadIO m, Num a) => TVar a -> m ()
increment TVar a
tvar
        Output a -> IO ()
writeOutputQ Output a
out
        TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response
    :: WorkerConf a
    -> Manager
    -> T.Handle
    -> ThreadContinue
    -> a
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
response :: forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
    OutBody
OutBodyNone -> do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyBuilder Builder
_ -> do
        OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyFile FileSpec
_ -> do
        OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
        Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
        OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
        -- We must not exit this server application.
        -- If the application exits, streaming would be also closed.
        -- So, this work occupies this thread.
        --
        -- We need to increase the number of workers.
        Manager -> IO ()
spawnAction Manager
mgr
        -- After this work, this thread stops to decease
        -- the number of workers.
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
        -- Since streaming body is loop, we cannot control it.
        -- So, let's serialize 'Builder' with a designated queue.
        TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
        Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        let push :: Builder -> IO ()
push Builder
b = do
                Handle -> IO ()
T.pause Handle
th
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> Maybe (IO ()) -> StreamingChunk
StreamingBuilder Builder
b Maybe (IO ())
forall a. Maybe a
Nothing)
                Handle -> IO ()
T.resume Handle
th
            flush :: IO ()
flush = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
            finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
        Manager -> IO ()
incCounter Manager
mgr
        (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` IO ()
finished
    OutBodyStreamingUnmask OutBodyIface -> IO ()
_ ->
        [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"response: server does not support OutBodyStreamingUnmask"
  where
    ([(Token, ByteString)]
_, ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: forall a. WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
SockAddr
a -> IO ()
a -> PushPromise -> IO (StreamId, a)
Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
makePushStream :: forall a. WorkerConf a -> a -> PushPromise -> IO (StreamId, a)
mySockAddr :: forall a. WorkerConf a -> SockAddr
peerSockAddr :: forall a. WorkerConf a -> SockAddr
readInputQ :: IO (Input a)
writeOutputQ :: Output a -> IO ()
workerCleanup :: a -> IO ()
isPushable :: IO Bool
makePushStream :: a -> PushPromise -> IO (StreamId, a)
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
..} Manager
mgr Server
server = do
    StreamInfo a
sinfo <- IO (StreamInfo a)
forall a. IO (StreamInfo a)
newStreamInfo
    ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
    Manager -> (Handle -> IO ()) -> IO ()
forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
  where
    go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
        Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
E.trySyncOrAsync (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
T.pause Handle
th
            Input a
strm InpObj
req <- IO (Input a)
readInputQ
            let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
            StreamInfo a -> a -> IO ()
forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
            Handle -> IO ()
T.resume Handle
th
            Handle -> IO ()
T.tickle Handle
th
            let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
            Server
server (InpObj -> Request
Request InpObj
req') Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
        Bool
cont1 <- case Either SomeException ()
ex of
            Right () -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Left e :: SomeException
e@(SomeException e
_)
                -- killed by the local worker manager
                | Just KilledByHttp2ThreadManager{} <- SomeException -> Maybe KilledByHttp2ThreadManager
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                -- killed by the local timeout manager
                | Just TimeoutThread
T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
                    StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                | Bool
otherwise -> do
                    StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
        StreamInfo a -> IO ()
forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
    pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req{inpObjBody = readBody'}
      where
        readBody :: IO (ByteString, Bool)
readBody = InpObj -> IO (ByteString, Bool)
inpObjBody InpObj
req
        readBody' :: IO (ByteString, Bool)
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            (ByteString, Bool)
bs <- IO (ByteString, Bool)
readBody
            Handle -> IO ()
T.resume Handle
th
            (ByteString, Bool) -> IO (ByteString, Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString, Bool)
bs
    cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
        Maybe a
minp <- StreamInfo a -> IO (Maybe a)
forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
        case Maybe a
minp of
            Maybe a
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
strm -> a -> IO ()
workerCleanup a
strm

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: forall a. IO (StreamInfo a)
newStreamInfo = IORef (Maybe a) -> StreamInfo a
forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo (IORef (Maybe a) -> StreamInfo a)
-> IO (IORef (Maybe a)) -> IO (StreamInfo a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: forall a. StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: forall a. StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (Maybe a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref