{-# LANGUAGE RecordWildCards #-}

-----------------------------------------------------------------------------

-----------------------------------------------------------------------------

{- |
 Module      :  OpenTelemetry.Processor.Batch.Span
 Copyright   :  (c) Ian Duncan, 2021
 License     :  BSD-3
 Description :  Performant exporting of spans in time & space-bounded batches.
 Maintainer  :  Ian Duncan
 Stability   :  experimental
 Portability :  non-portable (GHC extensions)

 This is an implementation of the Span Processor which create batches of finished spans and passes the export-friendly span data representations to the configured Exporter.
-}
module OpenTelemetry.Processor.Batch.Span (
  BatchTimeoutConfig (..),
  batchTimeoutConfig,
  batchProcessor,
  -- , BatchProcessorOperations
) where

import Control.Concurrent (rtsSupportsBoundThreads)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (atomicModifyIORef', newIORef, readIORef)
import Data.Vector (Vector)
import OpenTelemetry.Exporter.Span (SpanExporter)
import qualified OpenTelemetry.Exporter.Span as SpanExporter
import OpenTelemetry.Processor.Span
import OpenTelemetry.Trace.Core
import VectorBuilder.Builder as Builder
import VectorBuilder.Vector as Builder


-- | Configurable options for batch exporting frequence and size
data BatchTimeoutConfig = BatchTimeoutConfig
  { BatchTimeoutConfig -> Int
maxQueueSize :: Int
  -- ^ The maximum queue size. After the size is reached, spans are dropped.
  , BatchTimeoutConfig -> Int
scheduledDelayMillis :: Int
  -- ^ The delay interval in milliseconds between two consective exports.
  -- The default value is 5000.
  , BatchTimeoutConfig -> Int
exportTimeoutMillis :: Int
  -- ^ How long the export can run before it is cancelled.
  -- The default value is 30000.
  , BatchTimeoutConfig -> Int
maxExportBatchSize :: Int
  -- ^ The maximum batch size of every export. It must be
  -- smaller or equal to 'maxQueueSize'. The default value is 512.
  }
  deriving (Int -> BatchTimeoutConfig -> ShowS
[BatchTimeoutConfig] -> ShowS
BatchTimeoutConfig -> String
(Int -> BatchTimeoutConfig -> ShowS)
-> (BatchTimeoutConfig -> String)
-> ([BatchTimeoutConfig] -> ShowS)
-> Show BatchTimeoutConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BatchTimeoutConfig -> ShowS
showsPrec :: Int -> BatchTimeoutConfig -> ShowS
$cshow :: BatchTimeoutConfig -> String
show :: BatchTimeoutConfig -> String
$cshowList :: [BatchTimeoutConfig] -> ShowS
showList :: [BatchTimeoutConfig] -> ShowS
Show)


-- | Default configuration values
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig :: BatchTimeoutConfig
batchTimeoutConfig =
  BatchTimeoutConfig
    { maxQueueSize :: Int
maxQueueSize = Int
1024
    , scheduledDelayMillis :: Int
scheduledDelayMillis = Int
5000
    , exportTimeoutMillis :: Int
exportTimeoutMillis = Int
30000
    , maxExportBatchSize :: Int
maxExportBatchSize = Int
512
    }


-- type BatchProcessorOperations = ()

--  A multi-producer single-consumer green/blue buffer.
-- Write requests that cannot fit in the live chunk will be dropped
--
-- TODO, would be cool to use AtomicCounters for this if possible
-- data GreenBlueBuffer a = GreenBlueBuffer
--   { gbReadSection :: !(TVar Word)
--   , gbWriteGreenOrBlue :: !(TVar Word)
--   , gbPendingWrites :: !(TVar Word)
--   , gbSectionSize :: !Int
--   , gbVector :: !(M.IOVector a)
--   }

{- brainstorm: Single Word64 state sketch

  63 (high bit): green or blue
  32-62: read section
  0-32: write count
-}

{-

Green
    512       512       512       512
\|---------|---------|---------|---------|
     0         1         2         3

Blue
    512       512       512       512
\|---------|---------|---------|---------|
     0         1         2         3

The current read section denotes one chunk of length gbSize, which gets flushed
to the span exporter. Once the vector has been copied for export, gbReadSection
will be incremented.

-}

-- newGreenBlueBuffer
--   :: Int  --  Max queue size (2048)
--   -> Int  --  Export batch size (512)
--   -> IO (GreenBlueBuffer a)
-- newGreenBlueBuffer maxQueueSize batchSize = do
--   let logBase2 = finiteBitSize maxQueueSize - 1 - countLeadingZeros maxQueueSize

--   let closestFittingPowerOfTwo = 2 * if (1 `shiftL` logBase2) == maxQueueSize
--         then maxQueueSize
--         else 1 `shiftL` (logBase2 + 1)

--   readSection <- newTVarIO 0
--   writeSection <- newTVarIO 0
--   writeCount <- newTVarIO 0
--   buf <- M.new closestFittingPowerOfTwo
--   pure $ GreenBlueBuffer
--     { gbSize = maxQueueSize
--     , gbVector = buf
--     , gbReadSection = readSection
--     , gbPendingWrites = writeCount
--     }

-- isEmpty :: GreenBlueBuffer a -> STM Bool
-- isEmpty = do
--   c <- readTVar gbPendingWrites
--   pure (c == 0)

-- data InsertResult = ValueDropped | ValueInserted

-- tryInsert :: GreenBlueBuffer a -> a -> IO InsertResult
-- tryInsert GreenBlueBuffer{..} x = atomically $ do
--   c <- readTVar gbPendingWrites
--   if c == gbMaxLength
--     then pure ValueDropped
--     else do
--       greenOrBlue <- readTVar gbWriteGreenOrBlue
--       let i = c + ((M.length gbVector `shiftR` 1) `shiftL` (greenOrBlue `mod` 2))
--       M.write gbVector i x
--       writeTVar gbPendingWrites (c + 1)
--       pure ValueInserted

-- Caution, single writer means that this can't be called concurrently
-- consumeChunk :: GreenBlueBuffer a -> IO (V.Vector a)
-- consumeChunk GreenBlueBuffer{..} = atomically $ do
--   r <- readTVar gbReadSection
--   w <- readTVar gbWriteSection
--   c <- readTVar gbPendingWrites
--   when (r == w) $ do
--     modifyTVar gbWriteSection (+ 1)
--     setTVar gbPendingWrites 0
--   -- TODO slice and freeze appropriate section
-- M.slice (gbSectionSize * (r .&. gbSectionMask)

-- TODO, counters for dropped spans, exported spans

data BoundedMap a = BoundedMap
  { forall a. BoundedMap a -> Int
itemBounds :: !Int
  , forall a. BoundedMap a -> Int
itemMaxExportBounds :: !Int
  , forall a. BoundedMap a -> Int
itemCount :: !Int
  , forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap :: HashMap InstrumentationLibrary (Builder.Builder a)
  }


boundedMap :: Int -> Int -> BoundedMap a
boundedMap :: forall a. Int -> Int -> BoundedMap a
boundedMap Int
bounds Int
exportBounds = Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
forall a.
Int
-> Int
-> Int
-> HashMap InstrumentationLibrary (Builder a)
-> BoundedMap a
BoundedMap Int
bounds Int
exportBounds Int
0 HashMap InstrumentationLibrary (Builder a)
forall a. Monoid a => a
mempty


push :: ImmutableSpan -> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push :: ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
s BoundedMap ImmutableSpan
m =
  if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
m Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemBounds BoundedMap ImmutableSpan
m
    then Maybe (BoundedMap ImmutableSpan)
forall a. Maybe a
Nothing
    else
      BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a. a -> Maybe a
Just (BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan))
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
forall a b. (a -> b) -> a -> b
$!
        BoundedMap ImmutableSpan
m
          { itemCount = itemCount m + 1
          , itemMap =
              HashMap.insertWith
                (<>)
                (tracerName $ spanTracer s)
                (Builder.singleton s)
                $ itemMap m
          }


buildExport :: BoundedMap a -> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport :: forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport BoundedMap a
m =
  ( BoundedMap a
m {itemCount = 0, itemMap = mempty}
  , Builder a -> Vector a
forall (vector :: * -> *) element.
Vector vector element =>
Builder element -> vector element
Builder.build (Builder a -> Vector a)
-> HashMap InstrumentationLibrary (Builder a)
-> HashMap InstrumentationLibrary (Vector a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
forall a.
BoundedMap a -> HashMap InstrumentationLibrary (Builder a)
itemMap BoundedMap a
m
  )


data ProcessorMessage = ScheduledFlush | MaxExportFlush | Shutdown


-- note: [Unmasking Asyncs]
--
-- It is possible to create unkillable asyncs. Behold:
--
-- ```
-- a <- uninterruptibleMask_ $ do
--     async $ do
--         forever $ do
--             threadDelay 10_000
--             putStrLn "still alive"
-- cancel a
-- ```
--
-- The prior code block will never successfully cancel `a` and will be
-- blocked forever. The reason is that `cancel` sends an async exception to
-- the thread performing the action, but the `uninterruptibleMask` state is
-- inherited by the forked thread. This means that *no async exceptions*
-- can reach it, and `cancel` will therefore run forever.
--
-- This also affects `timeout`, which uses an async exception to kill the
-- running job. If the action is done in an uninterruptible masked state,
-- then the timeout will not successfully kill the running action.

{- |
 The batch processor accepts spans and places them into batches. Batching helps better compress the data and reduce the number of outgoing connections
 required to transmit the data. This processor supports both size and time based batching.

 NOTE: this function requires the program be compiled with the @-threaded@ GHC
 option and will throw an error if this is not the case.
-}
batchProcessor :: (MonadIO m) => BatchTimeoutConfig -> SpanExporter -> m SpanProcessor
batchProcessor :: forall (m :: * -> *).
MonadIO m =>
BatchTimeoutConfig -> SpanExporter -> m SpanProcessor
batchProcessor BatchTimeoutConfig {Int
maxQueueSize :: BatchTimeoutConfig -> Int
scheduledDelayMillis :: BatchTimeoutConfig -> Int
exportTimeoutMillis :: BatchTimeoutConfig -> Int
maxExportBatchSize :: BatchTimeoutConfig -> Int
maxQueueSize :: Int
scheduledDelayMillis :: Int
exportTimeoutMillis :: Int
maxExportBatchSize :: Int
..} SpanExporter
exporter = IO SpanProcessor -> m SpanProcessor
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SpanProcessor -> m SpanProcessor)
-> IO SpanProcessor -> m SpanProcessor
forall a b. (a -> b) -> a -> b
$ do
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. HasCallStack => String -> a
error String
"The hs-opentelemetry batch processor does not work without the -threaded GHC flag!"
  IORef (BoundedMap ImmutableSpan)
batch <- BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan))
forall a. a -> IO (IORef a)
newIORef (BoundedMap ImmutableSpan -> IO (IORef (BoundedMap ImmutableSpan)))
-> BoundedMap ImmutableSpan
-> IO (IORef (BoundedMap ImmutableSpan))
forall a b. (a -> b) -> a -> b
$ Int -> Int -> BoundedMap ImmutableSpan
forall a. Int -> Int -> BoundedMap a
boundedMap Int
maxQueueSize Int
maxExportBatchSize
  TMVar ()
workSignal <- IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
  TMVar ()
shutdownSignal <- IO (TMVar ())
forall a. IO (TMVar a)
newEmptyTMVarIO
  let publish :: HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess = IO ExportResult -> IO ExportResult
forall a. IO a -> IO a
mask_ (IO ExportResult -> IO ExportResult)
-> IO ExportResult -> IO ExportResult
forall a b. (a -> b) -> a -> b
$ do
        -- we mask async exceptions in this, so that a buggy exporter that
        -- catches async exceptions won't swallow them. since we use
        -- an interruptible mask, blocking calls can still be killed, like
        -- `threadDelay` or `putMVar` or most file I/O operations.
        --
        -- if we've received a shutdown, then we should be expecting
        -- a `cancel` anytime now.
        SpanExporter
-> HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
SpanExporter.spanExporterExport SpanExporter
exporter HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess

  let flushQueueImmediately :: ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret = do
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
    -> (BoundedMap ImmutableSpan,
        HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
    HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        if HashMap InstrumentationLibrary (Vector ImmutableSpan) -> Bool
forall a. HashMap InstrumentationLibrary a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
          then do
            ExportResult -> IO ExportResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ExportResult
ret
          else do
            ExportResult
ret' <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
ret'

  let waiting :: IO ProcessorMessage
waiting = do
        TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
scheduledDelayMillis)
        STM ProcessorMessage -> IO ProcessorMessage
forall a. STM a -> IO a
atomically (STM ProcessorMessage -> IO ProcessorMessage)
-> STM ProcessorMessage -> IO ProcessorMessage
forall a b. (a -> b) -> a -> b
$ do
          [STM ProcessorMessage] -> STM ProcessorMessage
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
            -- Flush every scheduled delay time, when we've reached the max export size, or when the shutdown signal is received.
            [ ProcessorMessage
ScheduledFlush ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                Bool
continue <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
                Bool -> STM ()
check Bool
continue
            , ProcessorMessage
MaxExportFlush ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
workSignal
            , ProcessorMessage
Shutdown ProcessorMessage -> STM () -> STM ProcessorMessage
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
shutdownSignal
            ]

  let workerAction :: IO ExportResult
workerAction = do
        ProcessorMessage
req <- IO ProcessorMessage
waiting
        HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan
    -> (BoundedMap ImmutableSpan,
        HashMap InstrumentationLibrary (Vector ImmutableSpan)))
-> IO (HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch BoundedMap ImmutableSpan
-> (BoundedMap ImmutableSpan,
    HashMap InstrumentationLibrary (Vector ImmutableSpan))
forall a.
BoundedMap a
-> (BoundedMap a, HashMap InstrumentationLibrary (Vector a))
buildExport
        ExportResult
res <- HashMap InstrumentationLibrary (Vector ImmutableSpan)
-> IO ExportResult
publish HashMap InstrumentationLibrary (Vector ImmutableSpan)
batchToProcess

        -- if we were asked to shutdown, stop waiting and flush it all out
        case ProcessorMessage
req of
          ProcessorMessage
Shutdown ->
            ExportResult -> IO ExportResult
flushQueueImmediately ExportResult
res
          ProcessorMessage
_ ->
            IO ExportResult
workerAction
  -- see note [Unmasking Asyncs]
  Async ExportResult
worker <- ((forall a. IO a -> IO a) -> IO ExportResult)
-> IO (Async ExportResult)
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask (((forall a. IO a -> IO a) -> IO ExportResult)
 -> IO (Async ExportResult))
-> ((forall a. IO a -> IO a) -> IO ExportResult)
-> IO (Async ExportResult)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO ExportResult -> IO ExportResult
forall a. IO a -> IO a
unmask IO ExportResult
workerAction

  SpanProcessor -> IO SpanProcessor
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (SpanProcessor -> IO SpanProcessor)
-> SpanProcessor -> IO SpanProcessor
forall a b. (a -> b) -> a -> b
$
    SpanProcessor
      { spanProcessorOnStart :: IORef ImmutableSpan -> Context -> IO ()
spanProcessorOnStart = \IORef ImmutableSpan
_ Context
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      , spanProcessorOnEnd :: IORef ImmutableSpan -> IO ()
spanProcessorOnEnd = \IORef ImmutableSpan
s -> do
          ImmutableSpan
span_ <- IORef ImmutableSpan -> IO ImmutableSpan
forall a. IORef a -> IO a
readIORef IORef ImmutableSpan
s
          Bool
appendFailedOrExportNeeded <- IORef (BoundedMap ImmutableSpan)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (BoundedMap ImmutableSpan)
batch ((BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
 -> IO Bool)
-> (BoundedMap ImmutableSpan -> (BoundedMap ImmutableSpan, Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \BoundedMap ImmutableSpan
builder ->
            case ImmutableSpan
-> BoundedMap ImmutableSpan -> Maybe (BoundedMap ImmutableSpan)
push ImmutableSpan
span_ BoundedMap ImmutableSpan
builder of
              Maybe (BoundedMap ImmutableSpan)
Nothing -> (BoundedMap ImmutableSpan
builder, Bool
True)
              Just BoundedMap ImmutableSpan
b' ->
                if BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemCount BoundedMap ImmutableSpan
b' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= BoundedMap ImmutableSpan -> Int
forall a. BoundedMap a -> Int
itemMaxExportBounds BoundedMap ImmutableSpan
b'
                  then -- If the batch has grown to the maximum export size, prompt the worker to export it.
                    (BoundedMap ImmutableSpan
b', Bool
True)
                  else (BoundedMap ImmutableSpan
b', Bool
False)
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
appendFailedOrExportNeeded (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , spanProcessorForceFlush :: IO ()
spanProcessorForceFlush = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar ()
workSignal ()
      , -- TODO where to call restore, if anywhere?
        spanProcessorShutdown :: IO (Async ShutdownResult)
spanProcessorShutdown =
          ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO (Async ShutdownResult)
forall a. ((forall a. IO a -> IO a) -> IO a) -> IO (Async a)
asyncWithUnmask (((forall a. IO a -> IO a) -> IO ShutdownResult)
 -> IO (Async ShutdownResult))
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO (Async ShutdownResult)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask -> IO ShutdownResult -> IO ShutdownResult
forall a. IO a -> IO a
unmask (IO ShutdownResult -> IO ShutdownResult)
-> IO ShutdownResult -> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ do
            -- we call asyncWithUnmask here because the shutdown action is
            -- likely to happen inside of a `finally` or `bracket`. the
            -- @safe-exceptions@ pattern (followed by unliftio as well)
            -- will use uninterruptibleMask in an exception cleanup. the
            -- uninterruptibleMask state means that the `timeout` call
            -- below will never exit, because `wait worker` will be in the
            -- `uninterruptibleMasked` state, and the timeout async
            -- exception will not be delivered.
            --
            -- see note [Unmasking Asyncs]
            ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ShutdownResult)
 -> IO ShutdownResult)
-> ((forall a. IO a -> IO a) -> IO ShutdownResult)
-> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
_restore -> do
              -- is it a little silly that we unmask and remask? seems
              -- silly! but the `mask` here is doing an interruptible mask.
              -- which means that async exceptions can still be delivered
              -- if a process is blocking.

              -- flush remaining messages and signal the worker to shutdown
              IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
shutdownSignal ()

              -- gracefully wait for the worker to stop. we may be in
              -- a `bracket` or responding to an async exception, so we
              -- must be very careful not to wait too long. the following
              -- STM action will block, so we'll be susceptible to an async
              -- exception.
              TVar Bool
delay <- Int -> IO (TVar Bool)
registerDelay (Int -> Int
millisToMicros Int
exportTimeoutMillis)
              Maybe (Either SomeException ExportResult)
shutdownResult <-
                STM (Maybe (Either SomeException ExportResult))
-> IO (Maybe (Either SomeException ExportResult))
forall a. STM a -> IO a
atomically (STM (Maybe (Either SomeException ExportResult))
 -> IO (Maybe (Either SomeException ExportResult)))
-> STM (Maybe (Either SomeException ExportResult))
-> IO (Maybe (Either SomeException ExportResult))
forall a b. (a -> b) -> a -> b
$
                  [STM (Maybe (Either SomeException ExportResult))]
-> STM (Maybe (Either SomeException ExportResult))
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadPlus m) =>
t (m a) -> m a
msum
                    [ Either SomeException ExportResult
-> Maybe (Either SomeException ExportResult)
forall a. a -> Maybe a
Just (Either SomeException ExportResult
 -> Maybe (Either SomeException ExportResult))
-> STM (Either SomeException ExportResult)
-> STM (Maybe (Either SomeException ExportResult))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async ExportResult -> STM (Either SomeException ExportResult)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async ExportResult
worker
                    , Maybe (Either SomeException ExportResult)
forall a. Maybe a
Nothing Maybe (Either SomeException ExportResult)
-> STM () -> STM (Maybe (Either SomeException ExportResult))
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ do
                        Bool
shouldStop <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
delay
                        Bool -> STM ()
check Bool
shouldStop
                    ]

              -- make sure the worker comes down if we timed out.
              Async ExportResult -> IO ()
forall a. Async a -> IO ()
cancel Async ExportResult
worker
              -- TODO, not convinced we should shut down processor here

              ShutdownResult -> IO ShutdownResult
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ShutdownResult -> IO ShutdownResult)
-> ShutdownResult -> IO ShutdownResult
forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException ExportResult)
shutdownResult of
                Maybe (Either SomeException ExportResult)
Nothing ->
                  ShutdownResult
ShutdownTimeout
                Just Either SomeException ExportResult
er ->
                  case Either SomeException ExportResult
er of
                    Left SomeException
_ ->
                      ShutdownResult
ShutdownFailure
                    Right ExportResult
_ ->
                      ShutdownResult
ShutdownSuccess
      }
  where
    millisToMicros :: Int -> Int
millisToMicros = (Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)

{-
buffer <- newGreenBlueBuffer _ _
batchProcessorAction <- async $ forever $ do
  -- It would be nice to do an immediate send when possible
  chunk <- if (sendDelay == 0)
    else consumeChunk
    then threadDelay sendDelay >> consumeChunk
  timeout _ $ export exporter chunk
pure $ Processor
  { onStart = \_ _ -> pure ()
  , onEnd = \s -> void $ tryInsert buffer s
  , shutdown = do
      gracefullyShutdownBatchProcessor

  , forceFlush = pure ()
  }
where
  sendDelay = scheduledDelayMilis * 1_000
-}